Author: markt
Date: Wed Jan 16 23:43:23 2013
New Revision: 1434500

URL: http://svn.apache.org/viewvc?rev=1434500&view=rev
Log:
Final parts of the puzzle to get APR/native to support the JSR356 WebSocket. 
There are a small number of Autobahn failures still. I'll look at those next.

Modified:
    tomcat/trunk/java/org/apache/coyote/http11/Http11AprProtocol.java
    tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprProcessor.java
    
tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprServletOutputStream.java
    tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java

Modified: tomcat/trunk/java/org/apache/coyote/http11/Http11AprProtocol.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/Http11AprProtocol.java?rev=1434500&r1=1434499&r2=1434500&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/Http11AprProtocol.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/Http11AprProtocol.java Wed Jan 
16 23:43:23 2013
@@ -340,7 +340,8 @@ public class Http11AprProtocol extends A
                 SocketWrapper<Long> socket,
                 ProtocolHandler httpUpgradeProcessor)
                 throws IOException {
-            return new AprProcessor(socket, httpUpgradeProcessor);
+            return new AprProcessor(socket, httpUpgradeProcessor,
+                    (AprEndpoint) proto.endpoint);
         }
     }
 }

Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprProcessor.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprProcessor.java?rev=1434500&r1=1434499&r2=1434500&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprProcessor.java 
(original)
+++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprProcessor.java Wed 
Jan 16 23:43:23 2013
@@ -19,6 +19,7 @@ package org.apache.coyote.http11.upgrade
 import javax.servlet.http.ProtocolHandler;
 
 import org.apache.tomcat.jni.Socket;
+import org.apache.tomcat.util.net.AprEndpoint;
 import org.apache.tomcat.util.net.SocketWrapper;
 
 public class AprProcessor extends AbstractProcessor<Long> {
@@ -26,10 +27,10 @@ public class AprProcessor extends Abstra
     private static final int INFINITE_TIMEOUT = -1;
 
     public AprProcessor(SocketWrapper<Long> wrapper,
-            ProtocolHandler httpUpgradeProcessor) {
+            ProtocolHandler httpUpgradeProcessor, AprEndpoint endpoint) {
         super(httpUpgradeProcessor,
                 new AprServletInputStream(wrapper),
-                new AprServletOutputStream(wrapper));
+                new AprServletOutputStream(wrapper, endpoint));
 
         Socket.timeoutSet(wrapper.getSocket().longValue(), INFINITE_TIMEOUT);
     }

Modified: 
tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprServletOutputStream.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprServletOutputStream.java?rev=1434500&r1=1434499&r2=1434500&view=diff
==============================================================================
--- 
tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprServletOutputStream.java 
(original)
+++ 
tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprServletOutputStream.java 
Wed Jan 16 23:43:23 2013
@@ -22,16 +22,21 @@ import java.util.concurrent.locks.Reentr
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
 import org.apache.tomcat.jni.Socket;
+import org.apache.tomcat.jni.Status;
+import org.apache.tomcat.util.net.AprEndpoint;
 import org.apache.tomcat.util.net.SocketWrapper;
 
 public class AprServletOutputStream extends AbstractServletOutputStream {
 
+    private final AprEndpoint endpoint;
     private final SocketWrapper<Long> wrapper;
     private final long socket;
     private final Lock blockingStatusReadLock;
     private final WriteLock blockingStatusWriteLock;
 
-    public AprServletOutputStream(SocketWrapper<Long> wrapper) {
+    public AprServletOutputStream(SocketWrapper<Long> wrapper,
+            AprEndpoint endpoint) {
+        this.endpoint = endpoint;
         this.wrapper = wrapper;
         this.socket = wrapper.getSocket().longValue();
         ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@@ -79,12 +84,20 @@ public class AprServletOutputStream exte
             }
         }
 
-        if (result < 0) {
-            throw new IOException(sm.getString("apr.write.error",
-                    Integer.valueOf(-result)));
+        if (result >= 0) {
+            if (result < len) {
+                endpoint.getPoller().add(socket, -1, false, true);
+            }
+            return result;
+        }
+        else if (-result == Status.EAGAIN) {
+            endpoint.getPoller().add(socket, -1, false, true);
+            return 0;
         }
 
-        return result;
+        throw new IOException(sm.getString("apr.write.error",
+                Integer.valueOf(-result)));
+
     }
 
     @Override

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java?rev=1434500&r1=1434499&r2=1434500&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java Wed Jan 16 
23:43:23 2013
@@ -1047,20 +1047,18 @@ public class AprEndpoint extends Abstrac
     // -------------------------------------------------- SocketInfo Inner 
Class
 
     public static class SocketInfo {
-        public static final int READ = 1;
-        public static final int WRITE = 2;
         public long socket;
         public int timeout;
         public int flags;
         public boolean read() {
-            return (flags & READ) == READ;
+            return (flags & Poll.APR_POLLIN) == Poll.APR_POLLIN;
         }
         public boolean write() {
-            return (flags & WRITE) == WRITE;
+            return (flags & Poll.APR_POLLOUT) == Poll.APR_POLLOUT;
         }
         public static int merge(int flag1, int flag2) {
-            return ((flag1 & READ) | (flag2 & READ))
-                | ((flag1 & WRITE) | (flag2 & WRITE));
+            return ((flag1 & Poll.APR_POLLIN) | (flag2 & Poll.APR_POLLIN))
+                | ((flag1 & Poll.APR_POLLOUT) | (flag2 & Poll.APR_POLLOUT));
         }
         @Override
         public String toString() {
@@ -1386,7 +1384,7 @@ public class AprEndpoint extends Abstrac
             synchronized (this) {
                 // Add socket to the list. Newly added sockets will wait
                 // at most for pollTime before being polled
-                if (addList.add(socket, timeout, SocketInfo.READ)) {
+                if (addList.add(socket, timeout, Poll.APR_POLLIN)) {
                     ok = true;
                     this.notify();
                 }
@@ -1417,6 +1415,12 @@ public class AprEndpoint extends Abstrac
          * @param write to do write polling
          */
         public void add(long socket, int timeout, boolean read, boolean write) 
{
+            add(socket, timeout,
+                    (read ? Poll.APR_POLLIN : 0) |
+                    (write ? Poll.APR_POLLOUT : 0));
+        }
+
+        private void add(long socket, int timeout, int flags) {
             if (timeout < 0) {
                 timeout = getSoTimeout();
             }
@@ -1428,9 +1432,7 @@ public class AprEndpoint extends Abstrac
             synchronized (this) {
                 // Add socket to the list. Newly added sockets will wait
                 // at most for pollTime before being polled
-                if (addList.add(socket, timeout,
-                        (read ? SocketInfo.READ : 0) |
-                                (write ? SocketInfo.WRITE : 0))) {
+                if (addList.add(socket, timeout, flags)) {
                     ok = true;
                     this.notify();
                 }
@@ -1630,6 +1632,9 @@ public class AprEndpoint extends Abstrac
                                 AprSocketWrapper wrapper = connections.get(
                                         Long.valueOf(desc[n*2+1]));
                                 wrapper.pollerFlags = wrapper.pollerFlags & 
~((int) desc[n*2]);
+                                if (wrapper.pollerFlags != 0) {
+                                    add(desc[n*2+1], 1, wrapper.pollerFlags);
+                                }
                                 // Check for failed sockets and hand this 
socket off to a worker
                                 if (wrapper.isComet()) {
                                     // Event processes either a read or a 
write depending on what the poller returns
@@ -1665,8 +1670,18 @@ public class AprEndpoint extends Abstrac
                                         || ((desc[n*2] & Poll.APR_POLLNVAL) == 
Poll.APR_POLLNVAL)) {
                                     // Close socket and clear pool
                                     destroySocket(desc[n*2+1]);
-                                } else if ((desc[n*2] & Poll.APR_POLLIN) == 
Poll.APR_POLLIN) {
-                                    if (!processSocket(desc[n*2+1], 
SocketStatus.OPEN_READ)) {
+                                } else if (((desc[n*2] & Poll.APR_POLLIN) == 
Poll.APR_POLLIN)
+                                        || ((desc[n*2] & Poll.APR_POLLOUT) == 
Poll.APR_POLLOUT)) {
+                                    boolean error = false;
+                                    if (((desc[n*2] & Poll.APR_POLLIN) == 
Poll.APR_POLLIN) &&
+                                            !processSocket(desc[n*2+1], 
SocketStatus.OPEN_READ)) {
+                                        error = true;
+                                        // Close socket and clear pool
+                                        destroySocket(desc[n*2+1]);
+                                    }
+                                    if (!error &&
+                                            ((desc[n*2] & Poll.APR_POLLOUT) == 
Poll.APR_POLLOUT) &&
+                                            !processSocket(desc[n*2+1], 
SocketStatus.OPEN_WRITE)) {
                                         // Close socket and clear pool
                                         destroySocket(desc[n*2+1]);
                                     }
@@ -2148,25 +2163,37 @@ public class AprEndpoint extends Abstrac
 
         @Override
         public void run() {
-            synchronized (socket) {
-                // Process the request from this socket
-                SocketState state = handler.process(socket, status);
-                if (state == Handler.SocketState.CLOSED) {
-                    // Close socket and pool
-                    destroySocket(socket.getSocket().longValue());
-                } else if (state == Handler.SocketState.LONG) {
-                    socket.access();
-                    if (socket.async) {
-                        waitingRequests.add(socket);
-                    }
-                } else if (state == Handler.SocketState.ASYNC_END) {
-                    socket.access();
-                    SocketProcessor proc = new SocketProcessor(socket,
-                            SocketStatus.OPEN_READ);
-                    getExecutor().execute(proc);
+
+            // Upgraded connections need to allow multiple threads to access 
the
+            // connection at the same time to enable blocking IO to be used 
when
+            // Servlet 3.1 NIO has been configured
+            if (socket != null && socket.isUpgraded()) {
+                doRun();
+            } else {
+                synchronized (socket) {
+                    doRun();
                 }
             }
         }
+
+        private void doRun() {
+            // Process the request from this socket
+            SocketState state = handler.process(socket, status);
+            if (state == Handler.SocketState.CLOSED) {
+                // Close socket and pool
+                destroySocket(socket.getSocket().longValue());
+            } else if (state == Handler.SocketState.LONG) {
+                socket.access();
+                if (socket.async) {
+                    waitingRequests.add(socket);
+                }
+            } else if (state == Handler.SocketState.ASYNC_END) {
+                socket.access();
+                SocketProcessor proc = new SocketProcessor(socket,
+                        SocketStatus.OPEN_READ);
+                getExecutor().execute(proc);
+            }
+        }
     }
 
 



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to