Author: markt Date: Thu May 30 11:51:47 2013 New Revision: 1487824 URL: http://svn.apache.org/r1487824 Log: Refactor write event registration for non-blocking IO. Unit tests pass on Windows. Need to check other platforms.
Modified: tomcat/trunk/java/org/apache/catalina/connector/CoyoteOutputStream.java tomcat/trunk/java/org/apache/catalina/connector/OutputBuffer.java tomcat/trunk/java/org/apache/coyote/Response.java tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java Modified: tomcat/trunk/java/org/apache/catalina/connector/CoyoteOutputStream.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/connector/CoyoteOutputStream.java?rev=1487824&r1=1487823&r2=1487824&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/connector/CoyoteOutputStream.java (original) +++ tomcat/trunk/java/org/apache/catalina/connector/CoyoteOutputStream.java Thu May 30 11:51:47 2013 @@ -77,8 +77,11 @@ public class CoyoteOutputStream extends @Override public void write(int i) throws IOException { - checkNonBlockingWrite(); + boolean nonBlocking = checkNonBlockingWrite(); ob.writeByte(i); + if (nonBlocking) { + checkRegisterForWrite(); + } } @@ -90,26 +93,55 @@ public class CoyoteOutputStream extends @Override public void write(byte[] b, int off, int len) throws IOException { - checkNonBlockingWrite(); + boolean nonBlocking = checkNonBlockingWrite(); ob.write(b, off, len); + if (nonBlocking) { + checkRegisterForWrite(); + } + } + + + /** + * Will send the buffer to the client. + */ + @Override + public void flush() throws IOException { + boolean nonBlocking = checkNonBlockingWrite(); + ob.flush(); + if (nonBlocking) { + checkRegisterForWrite(); + } } - private void checkNonBlockingWrite() { - if (!ob.isBlocking() && !ob.isReady()) { + /** + * Checks for concurrent writes which are not permitted. This object has no + * state information so the call chain is + * CoyoyeOutputStream->OutputBuffer->CoyoteResponse. + * + * @return <code>true</code> if this OutputStream is currently in + * non-blocking mode. + */ + private boolean checkNonBlockingWrite() { + boolean nonBlocking = !ob.isBlocking(); + if (nonBlocking && !ob.isReady()) { throw new IllegalStateException( sm.getString("coyoteOutputStream.nbNotready")); } + return nonBlocking; } /** - * Will send the buffer to the client. + * Checks to see if there is data left in the Coyote output buffers (NOT the + * servlet output buffer) and if so registers the associated socket for + * write so the buffers will be emptied. The container will take care of + * this. As far as the app is concerned, there is a non-blocking write in + * progress. It doesn't have visibility of whether the data is buffered in + * the socket buffer or the Coyote buffers. */ - @Override - public void flush() throws IOException { - checkNonBlockingWrite(); - ob.flush(); + private void checkRegisterForWrite() { + ob.checkRegisterForWrite(); } Modified: tomcat/trunk/java/org/apache/catalina/connector/OutputBuffer.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/connector/OutputBuffer.java?rev=1487824&r1=1487823&r2=1487824&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/connector/OutputBuffer.java (original) +++ tomcat/trunk/java/org/apache/catalina/connector/OutputBuffer.java Thu May 30 11:51:47 2013 @@ -645,6 +645,11 @@ public class OutputBuffer extends Writer } + /* + * All the non-blocking write state information is held in the Response so + * it is visible / accessible to all the code that needs it. + */ + public boolean isReady() { return coyoteResponse.isReady(); } @@ -658,4 +663,8 @@ public class OutputBuffer extends Writer public boolean isBlocking() { return coyoteResponse.getWriteListener() == null; } + + public void checkRegisterForWrite() { + coyoteResponse.checkRegisterForWrite(true); + } } Modified: tomcat/trunk/java/org/apache/coyote/Response.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/Response.java?rev=1487824&r1=1487823&r2=1487824&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/Response.java (original) +++ tomcat/trunk/java/org/apache/coyote/Response.java Thu May 30 11:51:47 2013 @@ -547,9 +547,15 @@ public final class Response { return outputBuffer.getBytesWritten(); } + /* + * State for non-blocking output is maintained here as it is the one point + * easily reachable from the CoyoteOutputStream and the Processor which both + * need access to state. + */ protected volatile WriteListener listener; private boolean fireListener = false; - private final Object fireListenerLock = new Object(); + private boolean registeredForWrite = false; + private final Object nonBlockingStateLock = new Object(); public WriteListener getWriteListener() { return listener; @@ -582,16 +588,27 @@ public final class Response { throw new IllegalStateException("not in non blocking mode."); } // Assume write is not possible - AtomicBoolean isReady = new AtomicBoolean(false); - synchronized (fireListenerLock) { - if (fireListener) { - // isReady() has already returned false + boolean ready = false; + synchronized (nonBlockingStateLock) { + if (registeredForWrite) { + fireListener = true; return false; } - action(ActionCode.NB_WRITE_INTEREST, isReady); - fireListener = !isReady.get(); + ready = checkRegisterForWrite(false); + fireListener = !ready; + } + return ready; + } + + public boolean checkRegisterForWrite(boolean internal) { + AtomicBoolean ready = new AtomicBoolean(false); + synchronized (nonBlockingStateLock) { + if (!registeredForWrite || internal) { + action(ActionCode.NB_WRITE_INTEREST, ready); + registeredForWrite = !ready.get(); + } } - return isReady.get(); + return ready.get(); } public void onWritePossible() throws IOException { @@ -599,7 +616,8 @@ public final class Response { // written in the Processor so if this point is reached the app is able // to write data. boolean fire = false; - synchronized (fireListenerLock) { + synchronized (nonBlockingStateLock) { + registeredForWrite = false; if (fireListener) { fireListener = false; fire = true; Modified: tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java?rev=1487824&r1=1487823&r2=1487824&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java Thu May 30 11:51:47 2013 @@ -1543,7 +1543,9 @@ public abstract class AbstractHttp11Proc try { if (outputBuffer.hasDataToWrite()) { if (outputBuffer.flushBuffer(false)) { - registerForEvent(false, true); + // There is data to write but go via Response to + // maintain a consistent view of non-blocking state + response.checkRegisterForWrite(true); return SocketState.LONG; } } Modified: tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java?rev=1487824&r1=1487823&r2=1487824&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java Thu May 30 11:51:47 2013 @@ -315,9 +315,11 @@ public class InternalAprOutputBuffer ext if (bbuf.remaining() == 0) { bbuf.clear(); flipped = false; - } else { - registerWriteInterest(); } + // If there is data left in the buffer the socket will be registered for + // write further up the stack. This is to ensure the socket is only + // registered for write once as both container and user code can trigger + // write registration. } Modified: tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java?rev=1487824&r1=1487823&r2=1487824&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java Thu May 30 11:51:47 2013 @@ -157,10 +157,10 @@ public class InternalNioOutputBuffer ext bytebuffer.clear(); flipped = false; } - if (flipped) { - // Still have data to write - registerWriteInterest(); - } + // If there is data left in the buffer the socket will be registered for + // write further up the stack. This is to ensure the socket is only + // registered for write once as both container and user code can trigger + // write registration. return written; } @@ -211,8 +211,13 @@ public class InternalNioOutputBuffer ext int thisTime = transfer(buf,offset,length,socket.getBufHandler().getWriteBuffer()); length = length - thisTime; offset = offset + thisTime; - writeToSocket(socket.getBufHandler().getWriteBuffer(), isBlocking(), true); - dataLeft = flushBuffer(isBlocking()); + int written = writeToSocket(socket.getBufHandler().getWriteBuffer(), + isBlocking(), true); + if (written == 0) { + dataLeft = true; + } else { + dataLeft = flushBuffer(isBlocking()); + } } NioEndpoint.KeyAttachment ka = (NioEndpoint.KeyAttachment)socket.getAttachment(false); --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org