Author: markt Date: Thu Sep 5 23:42:00 2013 New Revision: 1520458 URL: http://svn.apache.org/r1520458 Log: Fix some issues with Servlet 3.1 non-blocking writes and APR identified by the 'unit' test.
Modified: tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java tomcat/trunk/test/org/apache/catalina/nonblocking/TesterAjpNonBlockingClient.java Modified: tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java?rev=1520458&r1=1520457&r2=1520458&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java (original) +++ tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java Thu Sep 5 23:42:00 2013 @@ -28,6 +28,7 @@ import java.util.Iterator; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.atomic.AtomicBoolean; +import javax.servlet.RequestDispatcher; import javax.servlet.http.HttpUpgradeHandler; import org.apache.coyote.AbstractProcessor; @@ -587,20 +588,10 @@ public abstract class AbstractAjpProcess sm.getString("ajpprocessor.comet.notsupported")); } else if (actionCode == ActionCode.AVAILABLE) { - if (!endOfStream) { - if (empty) { - try { - refillReadBuffer(false); - } catch (IOException e) { - error = true; - return; - } - } - if (empty) { - request.setAvailable(0); - } else { - request.setAvailable(1); - } + if (available()) { + request.setAvailable(1); + } else { + request.setAvailable(0); } } else if (actionCode == ActionCode.NB_READ_INTEREST) { @@ -609,11 +600,12 @@ public abstract class AbstractAjpProcess } } else if (actionCode == ActionCode.NB_WRITE_INTEREST) { - // TODO - // Until 'true' non-blocking IO is implemented, assume it is always - // possible write data. AtomicBoolean isReady = (AtomicBoolean)param; - isReady.set(true); + boolean result = bufferedWrites.size() == 0 && responseMsgPos == -1; + isReady.set(result); + if (!result) { + registerForEvent(false, true); + } } else if (actionCode == ActionCode.REQUEST_BODY_FULLY_READ) { AtomicBoolean result = (AtomicBoolean) param; @@ -631,6 +623,40 @@ public abstract class AbstractAjpProcess @Override public SocketState asyncDispatch(SocketStatus status) { + if (status == SocketStatus.OPEN_WRITE) { + try { + asyncStateMachine.asyncOperation(); + try { + if (hasDataToWrite()) { + flushBufferedData(); + if (hasDataToWrite()) { + // There is data to write but go via Response to + // maintain a consistent view of non-blocking state + response.checkRegisterForWrite(true); + return SocketState.LONG; + } + } + } catch (IOException x) { + if (getLog().isDebugEnabled()) { + getLog().debug("Unable to write async data.",x); + } + status = SocketStatus.ASYNC_WRITE_ERROR; + request.setAttribute(RequestDispatcher.ERROR_EXCEPTION, x); + } + } catch (IllegalStateException x) { + registerForEvent(false, true); + } + } else if (status == SocketStatus.OPEN_READ && + request.getReadListener() != null) { + try { + if (available()) { + asyncStateMachine.asyncOperation(); + } + } catch (IllegalStateException x) { + registerForEvent(true, false); + } + } + RequestInfo rp = request.getRequestProcessor(); try { rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE); @@ -1491,6 +1517,25 @@ public abstract class AbstractAjpProcess } } + + private boolean available() { + if (endOfStream) { + return false; + } + if (empty) { + try { + refillReadBuffer(false); + } catch (IOException e) { + // Not ideal. This will indicate that data is available + // which should trigger a read which in turn will trigger + // another IOException and that one can be thrown. + return true; + } + } + return !empty; + } + + private void writeData(ByteChunk chunk) throws IOException { // Prevent timeout socketWrapper.access(); @@ -1521,7 +1566,7 @@ public abstract class AbstractAjpProcess bytesWritten += off; - if (chunk.getLength() > 0) { + if (len > 0) { // Add this chunk to the buffer addToBuffers(chunk.getBuffer(), off, len); } @@ -1539,6 +1584,11 @@ public abstract class AbstractAjpProcess } + private boolean hasDataToWrite() { + return responseMsgPos != -1 || bufferedWrites.size() > 0; + } + + private void flushBufferedData() throws IOException { if (responseMsgPos > -1) { Modified: tomcat/trunk/test/org/apache/catalina/nonblocking/TesterAjpNonBlockingClient.java URL: http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/catalina/nonblocking/TesterAjpNonBlockingClient.java?rev=1520458&r1=1520457&r2=1520458&view=diff ============================================================================== --- tomcat/trunk/test/org/apache/catalina/nonblocking/TesterAjpNonBlockingClient.java (original) +++ tomcat/trunk/test/org/apache/catalina/nonblocking/TesterAjpNonBlockingClient.java Thu Sep 5 23:42:00 2013 @@ -96,6 +96,8 @@ public class TesterAjpNonBlockingClient String resultString = result.toString(); log.info("Client read " + resultString.length() + " bytes"); + System.out.println(resultString); + Assert.assertTrue(resultString.contains("00000000000000010000")); } } --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org