Author: markt
Date: Thu Sep 5 23:42:00 2013
New Revision: 1520458
URL: http://svn.apache.org/r1520458
Log:
Fix some issues with Servlet 3.1 non-blocking writes and APR identified by the
'unit' test.
Modified:
tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java
tomcat/trunk/test/org/apache/catalina/nonblocking/TesterAjpNonBlockingClient.java
Modified: tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java?rev=1520458&r1=1520457&r2=1520458&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java Thu Sep
5 23:42:00 2013
@@ -28,6 +28,7 @@ import java.util.Iterator;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
+import javax.servlet.RequestDispatcher;
import javax.servlet.http.HttpUpgradeHandler;
import org.apache.coyote.AbstractProcessor;
@@ -587,20 +588,10 @@ public abstract class AbstractAjpProcess
sm.getString("ajpprocessor.comet.notsupported"));
} else if (actionCode == ActionCode.AVAILABLE) {
- if (!endOfStream) {
- if (empty) {
- try {
- refillReadBuffer(false);
- } catch (IOException e) {
- error = true;
- return;
- }
- }
- if (empty) {
- request.setAvailable(0);
- } else {
- request.setAvailable(1);
- }
+ if (available()) {
+ request.setAvailable(1);
+ } else {
+ request.setAvailable(0);
}
} else if (actionCode == ActionCode.NB_READ_INTEREST) {
@@ -609,11 +600,12 @@ public abstract class AbstractAjpProcess
}
} else if (actionCode == ActionCode.NB_WRITE_INTEREST) {
- // TODO
- // Until 'true' non-blocking IO is implemented, assume it is always
- // possible write data.
AtomicBoolean isReady = (AtomicBoolean)param;
- isReady.set(true);
+ boolean result = bufferedWrites.size() == 0 && responseMsgPos ==
-1;
+ isReady.set(result);
+ if (!result) {
+ registerForEvent(false, true);
+ }
} else if (actionCode == ActionCode.REQUEST_BODY_FULLY_READ) {
AtomicBoolean result = (AtomicBoolean) param;
@@ -631,6 +623,40 @@ public abstract class AbstractAjpProcess
@Override
public SocketState asyncDispatch(SocketStatus status) {
+ if (status == SocketStatus.OPEN_WRITE) {
+ try {
+ asyncStateMachine.asyncOperation();
+ try {
+ if (hasDataToWrite()) {
+ flushBufferedData();
+ if (hasDataToWrite()) {
+ // There is data to write but go via Response to
+ // maintain a consistent view of non-blocking state
+ response.checkRegisterForWrite(true);
+ return SocketState.LONG;
+ }
+ }
+ } catch (IOException x) {
+ if (getLog().isDebugEnabled()) {
+ getLog().debug("Unable to write async data.",x);
+ }
+ status = SocketStatus.ASYNC_WRITE_ERROR;
+ request.setAttribute(RequestDispatcher.ERROR_EXCEPTION, x);
+ }
+ } catch (IllegalStateException x) {
+ registerForEvent(false, true);
+ }
+ } else if (status == SocketStatus.OPEN_READ &&
+ request.getReadListener() != null) {
+ try {
+ if (available()) {
+ asyncStateMachine.asyncOperation();
+ }
+ } catch (IllegalStateException x) {
+ registerForEvent(true, false);
+ }
+ }
+
RequestInfo rp = request.getRequestProcessor();
try {
rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
@@ -1491,6 +1517,25 @@ public abstract class AbstractAjpProcess
}
}
+
+ private boolean available() {
+ if (endOfStream) {
+ return false;
+ }
+ if (empty) {
+ try {
+ refillReadBuffer(false);
+ } catch (IOException e) {
+ // Not ideal. This will indicate that data is available
+ // which should trigger a read which in turn will trigger
+ // another IOException and that one can be thrown.
+ return true;
+ }
+ }
+ return !empty;
+ }
+
+
private void writeData(ByteChunk chunk) throws IOException {
// Prevent timeout
socketWrapper.access();
@@ -1521,7 +1566,7 @@ public abstract class AbstractAjpProcess
bytesWritten += off;
- if (chunk.getLength() > 0) {
+ if (len > 0) {
// Add this chunk to the buffer
addToBuffers(chunk.getBuffer(), off, len);
}
@@ -1539,6 +1584,11 @@ public abstract class AbstractAjpProcess
}
+ private boolean hasDataToWrite() {
+ return responseMsgPos != -1 || bufferedWrites.size() > 0;
+ }
+
+
private void flushBufferedData() throws IOException {
if (responseMsgPos > -1) {
Modified:
tomcat/trunk/test/org/apache/catalina/nonblocking/TesterAjpNonBlockingClient.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/catalina/nonblocking/TesterAjpNonBlockingClient.java?rev=1520458&r1=1520457&r2=1520458&view=diff
==============================================================================
---
tomcat/trunk/test/org/apache/catalina/nonblocking/TesterAjpNonBlockingClient.java
(original)
+++
tomcat/trunk/test/org/apache/catalina/nonblocking/TesterAjpNonBlockingClient.java
Thu Sep 5 23:42:00 2013
@@ -96,6 +96,8 @@ public class TesterAjpNonBlockingClient
String resultString = result.toString();
log.info("Client read " + resultString.length() + " bytes");
+ System.out.println(resultString);
+
Assert.assertTrue(resultString.contains("00000000000000010000"));
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]