Author: markt Date: Tue Sep 3 20:26:59 2013 New Revision: 1519838 URL: http://svn.apache.org/r1519838 Log: When using Servlet 3.1 non-blocking reads with an AJP connector, make the reads between the AJP body messages from the server non-blocking. Once any part of a message is read, the remainder of the message will be read using blocking IO.
Added: tomcat/trunk/test/org/apache/catalina/nonblocking/TesterAjpNonBlockingClient.java (with props) Modified: tomcat/trunk/java/org/apache/coyote/AbstractProcessor.java tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java tomcat/trunk/java/org/apache/coyote/ajp/AjpAprProcessor.java tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProcessor.java tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java tomcat/trunk/java/org/apache/coyote/spdy/SpdyProcessor.java Modified: tomcat/trunk/java/org/apache/coyote/AbstractProcessor.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/AbstractProcessor.java?rev=1519838&r1=1519837&r2=1519838&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/AbstractProcessor.java (original) +++ tomcat/trunk/java/org/apache/coyote/AbstractProcessor.java Tue Sep 3 20:26:59 2013 @@ -173,4 +173,13 @@ public abstract class AbstractProcessor< @Override public abstract HttpUpgradeHandler getHttpUpgradeHandler(); + + + /** + * Register the socket for the specified events. + * + * @param read Register the socket for read events + * @param write Register the socket for write events + */ + protected abstract void registerForEvent(boolean read, boolean write); } Modified: tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java?rev=1519838&r1=1519837&r2=1519838&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java (original) +++ tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java Tue Sep 3 20:26:59 2013 @@ -212,6 +212,13 @@ public abstract class AbstractAjpProcess /** + * Indicates that a 'get body chunk' message has been sent but the body + * chunk has not yet been received. + */ + private boolean waitingForBodyMessage = false; + + + /** * Replay read. */ protected boolean replay = false; @@ -542,23 +549,34 @@ public abstract class AbstractAjpProcess sm.getString("ajpprocessor.comet.notsupported")); } else if (actionCode == ActionCode.AVAILABLE) { - // Web Server only sends data when asked so unless end of stream has - // been reached, there should be data available. - // TODO Figure out if a 'true' non-blocking approach is possible - // for AJP and what changes would be required to support it. if (!endOfStream) { - request.setAvailable(1); + if (empty) { + try { + refillReadBuffer(false); + } catch (IOException e) { + error = true; + return; + } + } + if (empty) { + request.setAvailable(0); + } else { + request.setAvailable(1); + } + } + + } else if (actionCode == ActionCode.NB_READ_INTEREST) { + if (!endOfStream) { + registerForEvent(true, false); } } else if (actionCode == ActionCode.NB_WRITE_INTEREST) { + // TODO // Until 'true' non-blocking IO is implemented, assume it is always // possible write data. AtomicBoolean isReady = (AtomicBoolean)param; isReady.set(true); - } else if (actionCode == ActionCode.NB_READ_INTEREST) { - // NO-OP. Not required until 'true' non-blocking IO is implemented. - } else if (actionCode == ActionCode.REQUEST_BODY_FULLY_READ) { AtomicBoolean result = (AtomicBoolean) param; result.set(endOfStream); @@ -814,6 +832,7 @@ public abstract class AbstractAjpProcess // Recycle Request object first = true; endOfStream = false; + waitingForBodyMessage = false; empty = true; replay = false; finished = false; @@ -866,16 +885,26 @@ public abstract class AbstractAjpProcess throws IOException; // Methods used by SocketInputBuffer - /** Receive a chunk of data. Called to implement the - * 'special' packet in ajp13 and to receive the data - * after we send a GET_BODY packet + /** + * Read an AJP body message. Used to read both the 'special' packet in ajp13 + * and to receive the data after we send a GET_BODY packet. + * + * @param block If there is no data available to read when this method is + * called, should this call block until data becomes available? + * + * @return <code>true</code> if at least one body byte was read, otherwise + * <code>false</code> */ - protected boolean receive() throws IOException { + protected boolean receive(boolean block) throws IOException { - first = false; bodyMessage.reset(); - readMessage(bodyMessage, true); + if (!readMessage(bodyMessage, block)) { + return false; + } + + waitingForBodyMessage = false; + first = false; // No data received. if (bodyMessage.getLen() == 0) { @@ -960,7 +989,7 @@ public abstract class AbstractAjpProcess * * @return true if there is more data, false if not. */ - protected boolean refillReadBuffer() throws IOException { + protected boolean refillReadBuffer(boolean block) throws IOException { // If the server returns an empty packet, assume that that end of // the stream has been reached (yuck -- fix protocol??). // FORM support @@ -972,10 +1001,13 @@ public abstract class AbstractAjpProcess } // Request more data immediately - output(getBodyMessageArray, 0, getBodyMessageArray.length); + if (!first && !waitingForBodyMessage) { + output(getBodyMessageArray, 0, getBodyMessageArray.length); + waitingForBodyMessage = true; + } - boolean moreData = receive(); - if( !moreData ) { + boolean moreData = receive(block); + if (!first && !waitingForBodyMessage && !moreData) { endOfStream = true; } return moreData; @@ -1402,8 +1434,8 @@ public abstract class AbstractAjpProcess finished = true; // Swallow the unread body packet if present - if (first && request.getContentLengthLong() > 0) { - receive(); + if (first && request.getContentLengthLong() > 0 || waitingForBodyMessage) { + receive(true); } // Add the end message @@ -1424,24 +1456,22 @@ public abstract class AbstractAjpProcess */ protected class SocketInputBuffer implements InputBuffer { - /** * Read bytes into the specified chunk. */ @Override - public int doRead(ByteChunk chunk, Request req) - throws IOException { + public int doRead(ByteChunk chunk, Request req) throws IOException { if (endOfStream) { return -1; } if (first && req.getContentLengthLong() > 0) { // Handle special first-body-chunk - if (!receive()) { + if (!receive(true)) { return 0; } } else if (empty) { - if (!refillReadBuffer()) { + if (!refillReadBuffer(true)) { return -1; } } @@ -1449,9 +1479,7 @@ public abstract class AbstractAjpProcess chunk.setBytes(bc.getBuffer(), bc.getStart(), bc.getLength()); empty = true; return chunk.getLength(); - } - } Modified: tomcat/trunk/java/org/apache/coyote/ajp/AjpAprProcessor.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/AjpAprProcessor.java?rev=1519838&r1=1519837&r2=1519838&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/ajp/AjpAprProcessor.java (original) +++ tomcat/trunk/java/org/apache/coyote/ajp/AjpAprProcessor.java Tue Sep 3 20:26:59 2013 @@ -73,6 +73,12 @@ public class AjpAprProcessor extends Abs @Override + protected void registerForEvent(boolean read, boolean write) { + ((AprEndpoint) endpoint).getPoller().add( + socketWrapper.getSocket().longValue(), -1, read, write); + } + + @Override protected void resetTimeouts() { // NO-OP. The AJP APR/native connector only uses the timeout value on // time SocketWrapper for async timeouts. Modified: tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProcessor.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProcessor.java?rev=1519838&r1=1519837&r2=1519838&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProcessor.java (original) +++ tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProcessor.java Tue Sep 3 20:26:59 2013 @@ -19,6 +19,7 @@ package org.apache.coyote.ajp; import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import org.apache.juli.logging.Log; @@ -57,6 +58,23 @@ public class AjpNioProcessor extends Abs @Override + protected void registerForEvent(boolean read, boolean write) { + final NioEndpoint.KeyAttachment attach = + (NioEndpoint.KeyAttachment)socketWrapper.getSocket().getAttachment( + false); + if (attach == null) { + return; + } + if (read) { + attach.interestOps(attach.interestOps() | SelectionKey.OP_READ); + } + if (write) { + attach.interestOps(attach.interestOps() | SelectionKey.OP_WRITE); + } + } + + + @Override protected void resetTimeouts() { // The NIO connector uses the timeout configured on the wrapper in the // poller. Therefore, it needs to be reset once asycn processing has Modified: tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java?rev=1519838&r1=1519837&r2=1519838&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java (original) +++ tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java Tue Sep 3 20:26:59 2013 @@ -70,6 +70,11 @@ public class AjpProcessor extends Abstra @Override + protected void registerForEvent(boolean read, boolean write) { + // NO-OP for BIO + } + + @Override protected void resetTimeouts() { // NO-OP. The AJP BIO connector only uses the timeout value on the // SocketWrapper for async timeouts. Modified: tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java?rev=1519838&r1=1519837&r2=1519838&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java Tue Sep 3 20:26:59 2013 @@ -1109,15 +1109,6 @@ public abstract class AbstractHttp11Proc /** - * Register the socket for the specified events. - * - * @param read Register the socket for read events - * @param write Register the socket for write events - */ - protected abstract void registerForEvent(boolean read, boolean write); - - - /** * After reading the request headers, we have to setup the request filters. */ protected void prepareRequest() { Modified: tomcat/trunk/java/org/apache/coyote/spdy/SpdyProcessor.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/spdy/SpdyProcessor.java?rev=1519838&r1=1519837&r2=1519838&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/spdy/SpdyProcessor.java (original) +++ tomcat/trunk/java/org/apache/coyote/spdy/SpdyProcessor.java Tue Sep 3 20:26:59 2013 @@ -496,6 +496,11 @@ public class SpdyProcessor<S> extends Ab return null; } + @Override + protected void registerForEvent(boolean read, boolean write) { + // NO-OP + } + public void onSynStream(SpdyStream str) throws IOException { this.spdyStream = str; SpdyFrame frame = str.reqFrame; Added: tomcat/trunk/test/org/apache/catalina/nonblocking/TesterAjpNonBlockingClient.java URL: http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/catalina/nonblocking/TesterAjpNonBlockingClient.java?rev=1519838&view=auto ============================================================================== --- tomcat/trunk/test/org/apache/catalina/nonblocking/TesterAjpNonBlockingClient.java (added) +++ tomcat/trunk/test/org/apache/catalina/nonblocking/TesterAjpNonBlockingClient.java Tue Sep 3 20:26:59 2013 @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.catalina.nonblocking; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.servlet.http.HttpServletResponse; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.catalina.nonblocking.TestNonBlockingAPI.DataWriter; +import org.apache.catalina.startup.TomcatBaseTest; +import org.apache.tomcat.util.buf.ByteChunk; + +public class TesterAjpNonBlockingClient extends TomcatBaseTest { + + /** + * This is not a standard unit test. This is a test client for AJP + * non-blocking reads. It assumes that there is an httpd instance listening + * on localhost:80 that is redirecting all traffic to a default Tomcat 8 + * instance that includes the examples web application. + */ + @Test + public void doTestAJPNonBlockingRead() throws Exception { + + Map<String, List<String>> resHeaders = new HashMap<>(); + ByteChunk out = new ByteChunk(); + int rc = postUrl(true, new DataWriter(2000), "http://localhost" + + "/examples/servlets/nonblocking/bytecounter", + out, resHeaders, null); + + System.out.println(out.toString()); + + Assert.assertEquals(HttpServletResponse.SC_OK, rc); + } +} Propchange: tomcat/trunk/test/org/apache/catalina/nonblocking/TesterAjpNonBlockingClient.java ------------------------------------------------------------------------------ svn:eol-style = native --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org