Author: markt Date: Thu May 9 21:30:30 2013 New Revision: 1480786 URL: http://svn.apache.org/r1480786 Log: Implement non-blocking write for APR. Add some trace level debug code to AprEndpoint that was useful in getting this working.
Modified: tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java Modified: tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java?rev=1480786&r1=1480785&r2=1480786&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java Thu May 9 21:30:30 2013 @@ -20,13 +20,17 @@ package org.apache.coyote.http11; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Iterator; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import org.apache.coyote.OutputBuffer; import org.apache.coyote.Response; import org.apache.tomcat.jni.Socket; +import org.apache.tomcat.jni.Status; import org.apache.tomcat.util.buf.ByteChunk; import org.apache.tomcat.util.http.HttpMessages; import org.apache.tomcat.util.net.AbstractEndpoint; +import org.apache.tomcat.util.net.AprEndpoint; import org.apache.tomcat.util.net.SocketWrapper; /** @@ -85,6 +89,16 @@ public class InternalAprOutputBuffer ext private final ByteBuffer bbuf; + /** + * <code>false</code> if bbuf is ready to be written to and + * <code>true</code> is ready to be read from. + */ + private volatile boolean flipped = false; + + + private AbstractEndpoint endpoint; + + // --------------------------------------------------------- Public Methods @Override @@ -93,6 +107,8 @@ public class InternalAprOutputBuffer ext wrapper = socketWrapper; socket = socketWrapper.getSocket().longValue(); + this.endpoint = endpoint; + Socket.setsbb(this.socket, bbuf); } @@ -107,6 +123,8 @@ public class InternalAprOutputBuffer ext super.recycle(); bbuf.clear(); + flipped = false; + wrapper = null; } @@ -156,15 +174,21 @@ public class InternalAprOutputBuffer ext if (length == 0) return; - // Try to flush any data in the socket's write buffer first - boolean dataLeft = flushBuffer(isBlocking()); + // If bbuf is currently being used for writes, add this data to the + // write buffer + if (flipped) { + addToBuffers(buf, offset, length); + return; + } // Keep writing until all the data is written or a non-blocking write // leaves data in the buffer - while (!dataLeft && length > 0) { + while (length > 0) { int thisTime = length; if (bbuf.position() == bbuf.capacity()) { - flushBuffer(isBlocking()); + if (flushBuffer(isBlocking())) { + break; + } } if (thisTime > bbuf.capacity() - bbuf.position()) { thisTime = bbuf.capacity() - bbuf.position(); @@ -180,7 +204,6 @@ public class InternalAprOutputBuffer ext // Buffer the remaining data addToBuffers(buf, offset, length); } - } @@ -199,17 +222,16 @@ public class InternalAprOutputBuffer ext * Callback to write data from the buffer. */ @Override - protected boolean flushBuffer(boolean block) throws IOException { + protected synchronized boolean flushBuffer(boolean block) + throws IOException { wrapper.access(); - boolean dataLeft = hasMoreDataToFlush(); - - if (dataLeft) { - writeToSocket(); + if (hasMoreDataToFlush()) { + writeToSocket(block); } - if (!dataLeft && bufferedWrites!=null) { + if (bufferedWrites.size() > 0) { Iterator<ByteBufferHolder> bufIter = bufferedWrites.iterator(); while (!hasMoreDataToFlush() && bufIter.hasNext()) { ByteBufferHolder buffer = bufIter.next(); @@ -219,25 +241,82 @@ public class InternalAprOutputBuffer ext if (buffer.getBuf().remaining() == 0) { bufIter.remove(); } - writeToSocket(); + writeToSocket(block); //here we must break if we didn't finish the write } } } - dataLeft = hasMoreDataToFlush(); - return hasMoreDataToFlush(); } + private void writeToSocket(boolean block) throws IOException { + + Lock readLock = wrapper.getBlockingStatusReadLock(); + WriteLock writeLock = wrapper.getBlockingStatusWriteLock(); + + try { + readLock.lock(); + if (wrapper.getBlockingStatus() == block) { + writeToSocket(); + return; + } + } finally { + readLock.unlock(); + } + + try { + writeLock.lock(); + // Set the current settings for this socket + wrapper.setBlockingStatus(block); + if (block) { + Socket.timeoutSet(socket, endpoint.getSoTimeout() * 1000); + } else { + Socket.timeoutSet(socket, 0); + } + + // Downgrade the lock + try { + readLock.lock(); + writeLock.unlock(); + writeToSocket(); + } finally { + readLock.unlock(); + } + } finally { + // Should have been released above but may not have been on some + // exception paths + if (writeLock.isHeldByCurrentThread()) { + writeLock.unlock(); + } + } + } + private void writeToSocket() throws IOException { - // TODO Implement non-blocking writes - if (Socket.sendbb(socket, 0, bbuf.position()) < 0) { - throw new IOException(); + if (!flipped) { + flipped = true; + bbuf.flip(); } - bbuf.clear(); + int written; + + do { + written = Socket.sendbb(socket, bbuf.position(), bbuf.remaining()); + if (Status.APR_STATUS_IS_EAGAIN(-written)) { + written = 0; + } else if (written < 0) { + throw new IOException("APR error: " + written); + } + bbuf.position(bbuf.position() + written); + } while (written > 0 && bbuf.hasRemaining()); + + if (bbuf.remaining() == 0) { + bbuf.clear(); + flipped = false; + } else { + ((AprEndpoint) endpoint).getPoller().add(socket, -1, false, true); + } } @@ -254,7 +333,8 @@ public class InternalAprOutputBuffer ext @Override protected boolean hasMoreDataToFlush() { - return bbuf.position() > 0; + return (flipped && bbuf.remaining() > 0) || + (!flipped && bbuf.position() > 0); } Modified: tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java?rev=1480786&r1=1480785&r2=1480786&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java Thu May 9 21:30:30 2013 @@ -671,6 +671,9 @@ public class AprEndpoint extends Abstrac // Ignore } poller = null; + if (log.isTraceEnabled()) { + log.trace("stopInternal() clearing connections map"); + } connections.clear(); if (useSendfile) { try { @@ -794,6 +797,9 @@ public class AprEndpoint extends Abstrac try { // During shutdown, executor may be null - avoid NPE if (running) { + if (log.isTraceEnabled()) { + log.trace("processSocketWithOptions(long): " + socket); + } AprSocketWrapper wrapper = new AprSocketWrapper(Long.valueOf(socket)); wrapper.setKeepAliveLeft(getMaxKeepAliveRequests()); @@ -892,6 +898,9 @@ public class AprEndpoint extends Abstrac } private void destroySocket(long socket) { + if (log.isTraceEnabled()) { + log.trace("destroySocket(long): " + socket); + } // If not running the socket will be destroyed by // parent pool or acceptor socket. // In any case disable double free which would cause JVM core. @@ -1495,6 +1504,9 @@ public class AprEndpoint extends Abstrac } long socket = timeouts.check(date); while (socket != 0) { + if (log.isTraceEnabled()) { + log.trace("Poller maintain() timing out socket: " + socket); + } removeFromPoller(socket); boolean comet = connections.get( Long.valueOf(socket)).isComet(); @@ -1575,14 +1587,16 @@ public class AprEndpoint extends Abstrac } SocketInfo info = localAddList.get(); while (info != null) { + if (log.isTraceEnabled()) { + log.trace("Poller run() adding socket: " + + info.socket); + } + removeFromPoller(info.socket); + timeouts.remove(info.socket); if (info.read() || info.write()) { AprSocketWrapper wrapper = connections.get( Long.valueOf(info.socket)); boolean comet = wrapper.isComet(); - // Store timeout - if (comet) { - removeFromPoller(info.socket); - } wrapper.pollerFlags = wrapper.pollerFlags | (info.read() ? Poll.APR_POLLIN : 0) | (info.write() ? Poll.APR_POLLOUT : 0); @@ -1600,7 +1614,6 @@ public class AprEndpoint extends Abstrac } } else { // Should never happen. - timeouts.remove(info.socket); destroySocket(info.socket); getLog().warn(sm.getString( "endpoint.apr.pollAddInvalid", info)); --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org