Author: markt
Date: Thu May 9 21:30:30 2013
New Revision: 1480786
URL: http://svn.apache.org/r1480786
Log:
Implement non-blocking write for APR.
Add some trace level debug code to AprEndpoint that was useful in getting this
working.
Modified:
tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java
tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
Modified:
tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java?rev=1480786&r1=1480785&r2=1480786&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java
(original)
+++ tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java Thu
May 9 21:30:30 2013
@@ -20,13 +20,17 @@ package org.apache.coyote.http11;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.coyote.OutputBuffer;
import org.apache.coyote.Response;
import org.apache.tomcat.jni.Socket;
+import org.apache.tomcat.jni.Status;
import org.apache.tomcat.util.buf.ByteChunk;
import org.apache.tomcat.util.http.HttpMessages;
import org.apache.tomcat.util.net.AbstractEndpoint;
+import org.apache.tomcat.util.net.AprEndpoint;
import org.apache.tomcat.util.net.SocketWrapper;
/**
@@ -85,6 +89,16 @@ public class InternalAprOutputBuffer ext
private final ByteBuffer bbuf;
+ /**
+ * <code>false</code> if bbuf is ready to be written to and
+ * <code>true</code> is ready to be read from.
+ */
+ private volatile boolean flipped = false;
+
+
+ private AbstractEndpoint endpoint;
+
+
// --------------------------------------------------------- Public Methods
@Override
@@ -93,6 +107,8 @@ public class InternalAprOutputBuffer ext
wrapper = socketWrapper;
socket = socketWrapper.getSocket().longValue();
+ this.endpoint = endpoint;
+
Socket.setsbb(this.socket, bbuf);
}
@@ -107,6 +123,8 @@ public class InternalAprOutputBuffer ext
super.recycle();
bbuf.clear();
+ flipped = false;
+
wrapper = null;
}
@@ -156,15 +174,21 @@ public class InternalAprOutputBuffer ext
if (length == 0) return;
- // Try to flush any data in the socket's write buffer first
- boolean dataLeft = flushBuffer(isBlocking());
+ // If bbuf is currently being used for writes, add this data to the
+ // write buffer
+ if (flipped) {
+ addToBuffers(buf, offset, length);
+ return;
+ }
// Keep writing until all the data is written or a non-blocking write
// leaves data in the buffer
- while (!dataLeft && length > 0) {
+ while (length > 0) {
int thisTime = length;
if (bbuf.position() == bbuf.capacity()) {
- flushBuffer(isBlocking());
+ if (flushBuffer(isBlocking())) {
+ break;
+ }
}
if (thisTime > bbuf.capacity() - bbuf.position()) {
thisTime = bbuf.capacity() - bbuf.position();
@@ -180,7 +204,6 @@ public class InternalAprOutputBuffer ext
// Buffer the remaining data
addToBuffers(buf, offset, length);
}
-
}
@@ -199,17 +222,16 @@ public class InternalAprOutputBuffer ext
* Callback to write data from the buffer.
*/
@Override
- protected boolean flushBuffer(boolean block) throws IOException {
+ protected synchronized boolean flushBuffer(boolean block)
+ throws IOException {
wrapper.access();
- boolean dataLeft = hasMoreDataToFlush();
-
- if (dataLeft) {
- writeToSocket();
+ if (hasMoreDataToFlush()) {
+ writeToSocket(block);
}
- if (!dataLeft && bufferedWrites!=null) {
+ if (bufferedWrites.size() > 0) {
Iterator<ByteBufferHolder> bufIter = bufferedWrites.iterator();
while (!hasMoreDataToFlush() && bufIter.hasNext()) {
ByteBufferHolder buffer = bufIter.next();
@@ -219,25 +241,82 @@ public class InternalAprOutputBuffer ext
if (buffer.getBuf().remaining() == 0) {
bufIter.remove();
}
- writeToSocket();
+ writeToSocket(block);
//here we must break if we didn't finish the write
}
}
}
- dataLeft = hasMoreDataToFlush();
-
return hasMoreDataToFlush();
}
+ private void writeToSocket(boolean block) throws IOException {
+
+ Lock readLock = wrapper.getBlockingStatusReadLock();
+ WriteLock writeLock = wrapper.getBlockingStatusWriteLock();
+
+ try {
+ readLock.lock();
+ if (wrapper.getBlockingStatus() == block) {
+ writeToSocket();
+ return;
+ }
+ } finally {
+ readLock.unlock();
+ }
+
+ try {
+ writeLock.lock();
+ // Set the current settings for this socket
+ wrapper.setBlockingStatus(block);
+ if (block) {
+ Socket.timeoutSet(socket, endpoint.getSoTimeout() * 1000);
+ } else {
+ Socket.timeoutSet(socket, 0);
+ }
+
+ // Downgrade the lock
+ try {
+ readLock.lock();
+ writeLock.unlock();
+ writeToSocket();
+ } finally {
+ readLock.unlock();
+ }
+ } finally {
+ // Should have been released above but may not have been on some
+ // exception paths
+ if (writeLock.isHeldByCurrentThread()) {
+ writeLock.unlock();
+ }
+ }
+ }
+
private void writeToSocket() throws IOException {
- // TODO Implement non-blocking writes
- if (Socket.sendbb(socket, 0, bbuf.position()) < 0) {
- throw new IOException();
+ if (!flipped) {
+ flipped = true;
+ bbuf.flip();
}
- bbuf.clear();
+ int written;
+
+ do {
+ written = Socket.sendbb(socket, bbuf.position(), bbuf.remaining());
+ if (Status.APR_STATUS_IS_EAGAIN(-written)) {
+ written = 0;
+ } else if (written < 0) {
+ throw new IOException("APR error: " + written);
+ }
+ bbuf.position(bbuf.position() + written);
+ } while (written > 0 && bbuf.hasRemaining());
+
+ if (bbuf.remaining() == 0) {
+ bbuf.clear();
+ flipped = false;
+ } else {
+ ((AprEndpoint) endpoint).getPoller().add(socket, -1, false, true);
+ }
}
@@ -254,7 +333,8 @@ public class InternalAprOutputBuffer ext
@Override
protected boolean hasMoreDataToFlush() {
- return bbuf.position() > 0;
+ return (flipped && bbuf.remaining() > 0) ||
+ (!flipped && bbuf.position() > 0);
}
Modified: tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java?rev=1480786&r1=1480785&r2=1480786&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java Thu May 9
21:30:30 2013
@@ -671,6 +671,9 @@ public class AprEndpoint extends Abstrac
// Ignore
}
poller = null;
+ if (log.isTraceEnabled()) {
+ log.trace("stopInternal() clearing connections map");
+ }
connections.clear();
if (useSendfile) {
try {
@@ -794,6 +797,9 @@ public class AprEndpoint extends Abstrac
try {
// During shutdown, executor may be null - avoid NPE
if (running) {
+ if (log.isTraceEnabled()) {
+ log.trace("processSocketWithOptions(long): " + socket);
+ }
AprSocketWrapper wrapper =
new AprSocketWrapper(Long.valueOf(socket));
wrapper.setKeepAliveLeft(getMaxKeepAliveRequests());
@@ -892,6 +898,9 @@ public class AprEndpoint extends Abstrac
}
private void destroySocket(long socket) {
+ if (log.isTraceEnabled()) {
+ log.trace("destroySocket(long): " + socket);
+ }
// If not running the socket will be destroyed by
// parent pool or acceptor socket.
// In any case disable double free which would cause JVM core.
@@ -1495,6 +1504,9 @@ public class AprEndpoint extends Abstrac
}
long socket = timeouts.check(date);
while (socket != 0) {
+ if (log.isTraceEnabled()) {
+ log.trace("Poller maintain() timing out socket: " +
socket);
+ }
removeFromPoller(socket);
boolean comet = connections.get(
Long.valueOf(socket)).isComet();
@@ -1575,14 +1587,16 @@ public class AprEndpoint extends Abstrac
}
SocketInfo info = localAddList.get();
while (info != null) {
+ if (log.isTraceEnabled()) {
+ log.trace("Poller run() adding socket: " +
+ info.socket);
+ }
+ removeFromPoller(info.socket);
+ timeouts.remove(info.socket);
if (info.read() || info.write()) {
AprSocketWrapper wrapper = connections.get(
Long.valueOf(info.socket));
boolean comet = wrapper.isComet();
- // Store timeout
- if (comet) {
- removeFromPoller(info.socket);
- }
wrapper.pollerFlags = wrapper.pollerFlags |
(info.read() ? Poll.APR_POLLIN : 0) |
(info.write() ? Poll.APR_POLLOUT : 0);
@@ -1600,7 +1614,6 @@ public class AprEndpoint extends Abstrac
}
} else {
// Should never happen.
- timeouts.remove(info.socket);
destroySocket(info.socket);
getLog().warn(sm.getString(
"endpoint.apr.pollAddInvalid", info));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]