Author: markt
Date: Thu May  9 21:30:30 2013
New Revision: 1480786

URL: http://svn.apache.org/r1480786
Log:
Implement non-blocking write for APR.
Add some trace level debug code to AprEndpoint that was useful in getting this 
working.

Modified:
    tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java
    tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java

Modified: 
tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java?rev=1480786&r1=1480785&r2=1480786&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java 
(original)
+++ tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java Thu 
May  9 21:30:30 2013
@@ -20,13 +20,17 @@ package org.apache.coyote.http11;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Iterator;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
 import org.apache.coyote.OutputBuffer;
 import org.apache.coyote.Response;
 import org.apache.tomcat.jni.Socket;
+import org.apache.tomcat.jni.Status;
 import org.apache.tomcat.util.buf.ByteChunk;
 import org.apache.tomcat.util.http.HttpMessages;
 import org.apache.tomcat.util.net.AbstractEndpoint;
+import org.apache.tomcat.util.net.AprEndpoint;
 import org.apache.tomcat.util.net.SocketWrapper;
 
 /**
@@ -85,6 +89,16 @@ public class InternalAprOutputBuffer ext
     private final ByteBuffer bbuf;
 
 
+    /**
+     * <code>false</code> if bbuf is ready to be written to and
+     * <code>true</code> is ready to be read from.
+     */
+    private volatile boolean flipped = false;
+
+
+    private AbstractEndpoint endpoint;
+
+
     // --------------------------------------------------------- Public Methods
 
     @Override
@@ -93,6 +107,8 @@ public class InternalAprOutputBuffer ext
 
         wrapper = socketWrapper;
         socket = socketWrapper.getSocket().longValue();
+        this.endpoint = endpoint;
+
         Socket.setsbb(this.socket, bbuf);
     }
 
@@ -107,6 +123,8 @@ public class InternalAprOutputBuffer ext
         super.recycle();
 
         bbuf.clear();
+        flipped = false;
+
         wrapper = null;
     }
 
@@ -156,15 +174,21 @@ public class InternalAprOutputBuffer ext
 
         if (length == 0) return;
 
-        // Try to flush any data in the socket's write buffer first
-        boolean dataLeft = flushBuffer(isBlocking());
+        // If bbuf is currently being used for writes, add this data to the
+        // write buffer
+        if (flipped) {
+            addToBuffers(buf, offset, length);
+            return;
+        }
 
         // Keep writing until all the data is written or a non-blocking write
         // leaves data in the buffer
-        while (!dataLeft && length > 0) {
+        while (length > 0) {
             int thisTime = length;
             if (bbuf.position() == bbuf.capacity()) {
-                flushBuffer(isBlocking());
+                if (flushBuffer(isBlocking())) {
+                    break;
+                }
             }
             if (thisTime > bbuf.capacity() - bbuf.position()) {
                 thisTime = bbuf.capacity() - bbuf.position();
@@ -180,7 +204,6 @@ public class InternalAprOutputBuffer ext
             // Buffer the remaining data
             addToBuffers(buf, offset, length);
         }
-
     }
 
 
@@ -199,17 +222,16 @@ public class InternalAprOutputBuffer ext
      * Callback to write data from the buffer.
      */
     @Override
-    protected boolean flushBuffer(boolean block) throws IOException {
+    protected synchronized boolean flushBuffer(boolean block)
+            throws IOException {
 
         wrapper.access();
 
-        boolean dataLeft = hasMoreDataToFlush();
-
-        if (dataLeft) {
-            writeToSocket();
+        if (hasMoreDataToFlush()) {
+            writeToSocket(block);
         }
 
-        if (!dataLeft && bufferedWrites!=null) {
+        if (bufferedWrites.size() > 0) {
             Iterator<ByteBufferHolder> bufIter = bufferedWrites.iterator();
             while (!hasMoreDataToFlush() && bufIter.hasNext()) {
                 ByteBufferHolder buffer = bufIter.next();
@@ -219,25 +241,82 @@ public class InternalAprOutputBuffer ext
                     if (buffer.getBuf().remaining() == 0) {
                         bufIter.remove();
                     }
-                    writeToSocket();
+                    writeToSocket(block);
                     //here we must break if we didn't finish the write
                 }
             }
         }
 
-        dataLeft = hasMoreDataToFlush();
-
         return hasMoreDataToFlush();
     }
 
 
+    private void writeToSocket(boolean block) throws IOException {
+
+        Lock readLock = wrapper.getBlockingStatusReadLock();
+        WriteLock writeLock = wrapper.getBlockingStatusWriteLock();
+
+        try {
+            readLock.lock();
+            if (wrapper.getBlockingStatus() == block) {
+                writeToSocket();
+                return;
+            }
+        } finally {
+            readLock.unlock();
+        }
+
+        try {
+            writeLock.lock();
+            // Set the current settings for this socket
+            wrapper.setBlockingStatus(block);
+            if (block) {
+                Socket.timeoutSet(socket, endpoint.getSoTimeout() * 1000);
+            } else {
+                Socket.timeoutSet(socket, 0);
+            }
+
+            // Downgrade the lock
+            try {
+                readLock.lock();
+                writeLock.unlock();
+                writeToSocket();
+            } finally {
+                readLock.unlock();
+            }
+        } finally {
+            // Should have been released above but may not have been on some
+            // exception paths
+            if (writeLock.isHeldByCurrentThread()) {
+                writeLock.unlock();
+            }
+        }
+    }
+
     private void writeToSocket() throws IOException {
-        // TODO Implement non-blocking writes
-        if (Socket.sendbb(socket, 0, bbuf.position()) < 0) {
-            throw new IOException();
+        if (!flipped) {
+            flipped = true;
+            bbuf.flip();
         }
-        bbuf.clear();
 
+        int written;
+
+        do {
+            written = Socket.sendbb(socket, bbuf.position(), bbuf.remaining());
+            if (Status.APR_STATUS_IS_EAGAIN(-written)) {
+                written = 0;
+            } else if (written < 0) {
+                throw new IOException("APR error: " + written);
+            }
+            bbuf.position(bbuf.position() + written);
+        } while (written > 0 && bbuf.hasRemaining());
+
+        if (bbuf.remaining() == 0) {
+            bbuf.clear();
+            flipped = false;
+        } else {
+            ((AprEndpoint) endpoint).getPoller().add(socket, -1, false, true);
+        }
     }
 
 
@@ -254,7 +333,8 @@ public class InternalAprOutputBuffer ext
 
     @Override
     protected boolean hasMoreDataToFlush() {
-        return bbuf.position() > 0;
+        return (flipped && bbuf.remaining() > 0) ||
+                (!flipped && bbuf.position() > 0);
     }
 
 

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java?rev=1480786&r1=1480785&r2=1480786&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java Thu May  9 
21:30:30 2013
@@ -671,6 +671,9 @@ public class AprEndpoint extends Abstrac
                 // Ignore
             }
             poller = null;
+            if (log.isTraceEnabled()) {
+                log.trace("stopInternal() clearing connections map");
+            }
             connections.clear();
             if (useSendfile) {
                 try {
@@ -794,6 +797,9 @@ public class AprEndpoint extends Abstrac
         try {
             // During shutdown, executor may be null - avoid NPE
             if (running) {
+                if (log.isTraceEnabled()) {
+                    log.trace("processSocketWithOptions(long): " + socket);
+                }
                 AprSocketWrapper wrapper =
                         new AprSocketWrapper(Long.valueOf(socket));
                 wrapper.setKeepAliveLeft(getMaxKeepAliveRequests());
@@ -892,6 +898,9 @@ public class AprEndpoint extends Abstrac
     }
 
     private void destroySocket(long socket) {
+        if (log.isTraceEnabled()) {
+            log.trace("destroySocket(long): " + socket);
+        }
         // If not running the socket will be destroyed by
         // parent pool or acceptor socket.
         // In any case disable double free which would cause JVM core.
@@ -1495,6 +1504,9 @@ public class AprEndpoint extends Abstrac
             }
             long socket = timeouts.check(date);
             while (socket != 0) {
+                if (log.isTraceEnabled()) {
+                    log.trace("Poller maintain() timing out socket: " + 
socket);
+                }
                 removeFromPoller(socket);
                 boolean comet = connections.get(
                         Long.valueOf(socket)).isComet();
@@ -1575,14 +1587,16 @@ public class AprEndpoint extends Abstrac
                         }
                         SocketInfo info = localAddList.get();
                         while (info != null) {
+                            if (log.isTraceEnabled()) {
+                                log.trace("Poller run() adding socket: " +
+                                        info.socket);
+                            }
+                            removeFromPoller(info.socket);
+                            timeouts.remove(info.socket);
                             if (info.read() || info.write()) {
                                 AprSocketWrapper wrapper = connections.get(
                                         Long.valueOf(info.socket));
                                 boolean comet = wrapper.isComet();
-                                // Store timeout
-                                if (comet) {
-                                    removeFromPoller(info.socket);
-                                }
                                 wrapper.pollerFlags = wrapper.pollerFlags |
                                         (info.read() ? Poll.APR_POLLIN : 0) |
                                         (info.write() ? Poll.APR_POLLOUT : 0);
@@ -1600,7 +1614,6 @@ public class AprEndpoint extends Abstrac
                                 }
                             } else {
                                 // Should never happen.
-                                timeouts.remove(info.socket);
                                 destroySocket(info.socket);
                                 getLog().warn(sm.getString(
                                         "endpoint.apr.pollAddInvalid", info));



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to