Author: markt
Date: Tue Sep 3 20:26:59 2013
New Revision: 1519838
URL: http://svn.apache.org/r1519838
Log:
When using Servlet 3.1 non-blocking reads with an AJP connector, make the reads
between the AJP body messages from the server non-blocking. Once any part of a
message is read, the remainder of the message will be read using blocking IO.
Added:
tomcat/trunk/test/org/apache/catalina/nonblocking/TesterAjpNonBlockingClient.java
(with props)
Modified:
tomcat/trunk/java/org/apache/coyote/AbstractProcessor.java
tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java
tomcat/trunk/java/org/apache/coyote/ajp/AjpAprProcessor.java
tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProcessor.java
tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java
tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java
tomcat/trunk/java/org/apache/coyote/spdy/SpdyProcessor.java
Modified: tomcat/trunk/java/org/apache/coyote/AbstractProcessor.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/AbstractProcessor.java?rev=1519838&r1=1519837&r2=1519838&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/AbstractProcessor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/AbstractProcessor.java Tue Sep 3
20:26:59 2013
@@ -173,4 +173,13 @@ public abstract class AbstractProcessor<
@Override
public abstract HttpUpgradeHandler getHttpUpgradeHandler();
+
+
+ /**
+ * Register the socket for the specified events.
+ *
+ * @param read Register the socket for read events
+ * @param write Register the socket for write events
+ */
+ protected abstract void registerForEvent(boolean read, boolean write);
}
Modified: tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java?rev=1519838&r1=1519837&r2=1519838&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java Tue Sep
3 20:26:59 2013
@@ -212,6 +212,13 @@ public abstract class AbstractAjpProcess
/**
+ * Indicates that a 'get body chunk' message has been sent but the body
+ * chunk has not yet been received.
+ */
+ private boolean waitingForBodyMessage = false;
+
+
+ /**
* Replay read.
*/
protected boolean replay = false;
@@ -542,23 +549,34 @@ public abstract class AbstractAjpProcess
sm.getString("ajpprocessor.comet.notsupported"));
} else if (actionCode == ActionCode.AVAILABLE) {
- // Web Server only sends data when asked so unless end of stream
has
- // been reached, there should be data available.
- // TODO Figure out if a 'true' non-blocking approach is possible
- // for AJP and what changes would be required to support it.
if (!endOfStream) {
- request.setAvailable(1);
+ if (empty) {
+ try {
+ refillReadBuffer(false);
+ } catch (IOException e) {
+ error = true;
+ return;
+ }
+ }
+ if (empty) {
+ request.setAvailable(0);
+ } else {
+ request.setAvailable(1);
+ }
+ }
+
+ } else if (actionCode == ActionCode.NB_READ_INTEREST) {
+ if (!endOfStream) {
+ registerForEvent(true, false);
}
} else if (actionCode == ActionCode.NB_WRITE_INTEREST) {
+ // TODO
// Until 'true' non-blocking IO is implemented, assume it is always
// possible write data.
AtomicBoolean isReady = (AtomicBoolean)param;
isReady.set(true);
- } else if (actionCode == ActionCode.NB_READ_INTEREST) {
- // NO-OP. Not required until 'true' non-blocking IO is implemented.
-
} else if (actionCode == ActionCode.REQUEST_BODY_FULLY_READ) {
AtomicBoolean result = (AtomicBoolean) param;
result.set(endOfStream);
@@ -814,6 +832,7 @@ public abstract class AbstractAjpProcess
// Recycle Request object
first = true;
endOfStream = false;
+ waitingForBodyMessage = false;
empty = true;
replay = false;
finished = false;
@@ -866,16 +885,26 @@ public abstract class AbstractAjpProcess
throws IOException;
// Methods used by SocketInputBuffer
- /** Receive a chunk of data. Called to implement the
- * 'special' packet in ajp13 and to receive the data
- * after we send a GET_BODY packet
+ /**
+ * Read an AJP body message. Used to read both the 'special' packet in
ajp13
+ * and to receive the data after we send a GET_BODY packet.
+ *
+ * @param block If there is no data available to read when this method is
+ * called, should this call block until data becomes
available?
+ *
+ * @return <code>true</code> if at least one body byte was read, otherwise
+ * <code>false</code>
*/
- protected boolean receive() throws IOException {
+ protected boolean receive(boolean block) throws IOException {
- first = false;
bodyMessage.reset();
- readMessage(bodyMessage, true);
+ if (!readMessage(bodyMessage, block)) {
+ return false;
+ }
+
+ waitingForBodyMessage = false;
+ first = false;
// No data received.
if (bodyMessage.getLen() == 0) {
@@ -960,7 +989,7 @@ public abstract class AbstractAjpProcess
*
* @return true if there is more data, false if not.
*/
- protected boolean refillReadBuffer() throws IOException {
+ protected boolean refillReadBuffer(boolean block) throws IOException {
// If the server returns an empty packet, assume that that end of
// the stream has been reached (yuck -- fix protocol??).
// FORM support
@@ -972,10 +1001,13 @@ public abstract class AbstractAjpProcess
}
// Request more data immediately
- output(getBodyMessageArray, 0, getBodyMessageArray.length);
+ if (!first && !waitingForBodyMessage) {
+ output(getBodyMessageArray, 0, getBodyMessageArray.length);
+ waitingForBodyMessage = true;
+ }
- boolean moreData = receive();
- if( !moreData ) {
+ boolean moreData = receive(block);
+ if (!first && !waitingForBodyMessage && !moreData) {
endOfStream = true;
}
return moreData;
@@ -1402,8 +1434,8 @@ public abstract class AbstractAjpProcess
finished = true;
// Swallow the unread body packet if present
- if (first && request.getContentLengthLong() > 0) {
- receive();
+ if (first && request.getContentLengthLong() > 0 ||
waitingForBodyMessage) {
+ receive(true);
}
// Add the end message
@@ -1424,24 +1456,22 @@ public abstract class AbstractAjpProcess
*/
protected class SocketInputBuffer implements InputBuffer {
-
/**
* Read bytes into the specified chunk.
*/
@Override
- public int doRead(ByteChunk chunk, Request req)
- throws IOException {
+ public int doRead(ByteChunk chunk, Request req) throws IOException {
if (endOfStream) {
return -1;
}
if (first && req.getContentLengthLong() > 0) {
// Handle special first-body-chunk
- if (!receive()) {
+ if (!receive(true)) {
return 0;
}
} else if (empty) {
- if (!refillReadBuffer()) {
+ if (!refillReadBuffer(true)) {
return -1;
}
}
@@ -1449,9 +1479,7 @@ public abstract class AbstractAjpProcess
chunk.setBytes(bc.getBuffer(), bc.getStart(), bc.getLength());
empty = true;
return chunk.getLength();
-
}
-
}
Modified: tomcat/trunk/java/org/apache/coyote/ajp/AjpAprProcessor.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/AjpAprProcessor.java?rev=1519838&r1=1519837&r2=1519838&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/ajp/AjpAprProcessor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/ajp/AjpAprProcessor.java Tue Sep 3
20:26:59 2013
@@ -73,6 +73,12 @@ public class AjpAprProcessor extends Abs
@Override
+ protected void registerForEvent(boolean read, boolean write) {
+ ((AprEndpoint) endpoint).getPoller().add(
+ socketWrapper.getSocket().longValue(), -1, read, write);
+ }
+
+ @Override
protected void resetTimeouts() {
// NO-OP. The AJP APR/native connector only uses the timeout value on
// time SocketWrapper for async timeouts.
Modified: tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProcessor.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProcessor.java?rev=1519838&r1=1519837&r2=1519838&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProcessor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProcessor.java Tue Sep 3
20:26:59 2013
@@ -19,6 +19,7 @@ package org.apache.coyote.ajp;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import org.apache.juli.logging.Log;
@@ -57,6 +58,23 @@ public class AjpNioProcessor extends Abs
@Override
+ protected void registerForEvent(boolean read, boolean write) {
+ final NioEndpoint.KeyAttachment attach =
+
(NioEndpoint.KeyAttachment)socketWrapper.getSocket().getAttachment(
+ false);
+ if (attach == null) {
+ return;
+ }
+ if (read) {
+ attach.interestOps(attach.interestOps() | SelectionKey.OP_READ);
+ }
+ if (write) {
+ attach.interestOps(attach.interestOps() | SelectionKey.OP_WRITE);
+ }
+ }
+
+
+ @Override
protected void resetTimeouts() {
// The NIO connector uses the timeout configured on the wrapper in the
// poller. Therefore, it needs to be reset once asycn processing has
Modified: tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java?rev=1519838&r1=1519837&r2=1519838&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java Tue Sep 3
20:26:59 2013
@@ -70,6 +70,11 @@ public class AjpProcessor extends Abstra
@Override
+ protected void registerForEvent(boolean read, boolean write) {
+ // NO-OP for BIO
+ }
+
+ @Override
protected void resetTimeouts() {
// NO-OP. The AJP BIO connector only uses the timeout value on the
// SocketWrapper for async timeouts.
Modified:
tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java?rev=1519838&r1=1519837&r2=1519838&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java
(original)
+++ tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java Tue
Sep 3 20:26:59 2013
@@ -1109,15 +1109,6 @@ public abstract class AbstractHttp11Proc
/**
- * Register the socket for the specified events.
- *
- * @param read Register the socket for read events
- * @param write Register the socket for write events
- */
- protected abstract void registerForEvent(boolean read, boolean write);
-
-
- /**
* After reading the request headers, we have to setup the request filters.
*/
protected void prepareRequest() {
Modified: tomcat/trunk/java/org/apache/coyote/spdy/SpdyProcessor.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/spdy/SpdyProcessor.java?rev=1519838&r1=1519837&r2=1519838&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/spdy/SpdyProcessor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/spdy/SpdyProcessor.java Tue Sep 3
20:26:59 2013
@@ -496,6 +496,11 @@ public class SpdyProcessor<S> extends Ab
return null;
}
+ @Override
+ protected void registerForEvent(boolean read, boolean write) {
+ // NO-OP
+ }
+
public void onSynStream(SpdyStream str) throws IOException {
this.spdyStream = str;
SpdyFrame frame = str.reqFrame;
Added:
tomcat/trunk/test/org/apache/catalina/nonblocking/TesterAjpNonBlockingClient.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/catalina/nonblocking/TesterAjpNonBlockingClient.java?rev=1519838&view=auto
==============================================================================
---
tomcat/trunk/test/org/apache/catalina/nonblocking/TesterAjpNonBlockingClient.java
(added)
+++
tomcat/trunk/test/org/apache/catalina/nonblocking/TesterAjpNonBlockingClient.java
Tue Sep 3 20:26:59 2013
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.nonblocking;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.servlet.http.HttpServletResponse;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.catalina.nonblocking.TestNonBlockingAPI.DataWriter;
+import org.apache.catalina.startup.TomcatBaseTest;
+import org.apache.tomcat.util.buf.ByteChunk;
+
+public class TesterAjpNonBlockingClient extends TomcatBaseTest {
+
+ /**
+ * This is not a standard unit test. This is a test client for AJP
+ * non-blocking reads. It assumes that there is an httpd instance listening
+ * on localhost:80 that is redirecting all traffic to a default Tomcat 8
+ * instance that includes the examples web application.
+ */
+ @Test
+ public void doTestAJPNonBlockingRead() throws Exception {
+
+ Map<String, List<String>> resHeaders = new HashMap<>();
+ ByteChunk out = new ByteChunk();
+ int rc = postUrl(true, new DataWriter(2000), "http://localhost" +
+ "/examples/servlets/nonblocking/bytecounter",
+ out, resHeaders, null);
+
+ System.out.println(out.toString());
+
+ Assert.assertEquals(HttpServletResponse.SC_OK, rc);
+ }
+}
Propchange:
tomcat/trunk/test/org/apache/catalina/nonblocking/TesterAjpNonBlockingClient.java
------------------------------------------------------------------------------
svn:eol-style = native
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]