Author: markt Date: Wed Jan 16 23:43:23 2013 New Revision: 1434500 URL: http://svn.apache.org/viewvc?rev=1434500&view=rev Log: Final parts of the puzzle to get APR/native to support the JSR356 WebSocket. There are a small number of Autobahn failures still. I'll look at those next.
Modified: tomcat/trunk/java/org/apache/coyote/http11/Http11AprProtocol.java tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprProcessor.java tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprServletOutputStream.java tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java Modified: tomcat/trunk/java/org/apache/coyote/http11/Http11AprProtocol.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/Http11AprProtocol.java?rev=1434500&r1=1434499&r2=1434500&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/Http11AprProtocol.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/Http11AprProtocol.java Wed Jan 16 23:43:23 2013 @@ -340,7 +340,8 @@ public class Http11AprProtocol extends A SocketWrapper<Long> socket, ProtocolHandler httpUpgradeProcessor) throws IOException { - return new AprProcessor(socket, httpUpgradeProcessor); + return new AprProcessor(socket, httpUpgradeProcessor, + (AprEndpoint) proto.endpoint); } } } Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprProcessor.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprProcessor.java?rev=1434500&r1=1434499&r2=1434500&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprProcessor.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprProcessor.java Wed Jan 16 23:43:23 2013 @@ -19,6 +19,7 @@ package org.apache.coyote.http11.upgrade import javax.servlet.http.ProtocolHandler; import org.apache.tomcat.jni.Socket; +import org.apache.tomcat.util.net.AprEndpoint; import org.apache.tomcat.util.net.SocketWrapper; public class AprProcessor extends AbstractProcessor<Long> { @@ -26,10 +27,10 @@ public class AprProcessor extends Abstra private static final int INFINITE_TIMEOUT = -1; public AprProcessor(SocketWrapper<Long> wrapper, - ProtocolHandler httpUpgradeProcessor) { + ProtocolHandler httpUpgradeProcessor, AprEndpoint endpoint) { super(httpUpgradeProcessor, new AprServletInputStream(wrapper), - new AprServletOutputStream(wrapper)); + new AprServletOutputStream(wrapper, endpoint)); Socket.timeoutSet(wrapper.getSocket().longValue(), INFINITE_TIMEOUT); } Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprServletOutputStream.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprServletOutputStream.java?rev=1434500&r1=1434499&r2=1434500&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprServletOutputStream.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprServletOutputStream.java Wed Jan 16 23:43:23 2013 @@ -22,16 +22,21 @@ import java.util.concurrent.locks.Reentr import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import org.apache.tomcat.jni.Socket; +import org.apache.tomcat.jni.Status; +import org.apache.tomcat.util.net.AprEndpoint; import org.apache.tomcat.util.net.SocketWrapper; public class AprServletOutputStream extends AbstractServletOutputStream { + private final AprEndpoint endpoint; private final SocketWrapper<Long> wrapper; private final long socket; private final Lock blockingStatusReadLock; private final WriteLock blockingStatusWriteLock; - public AprServletOutputStream(SocketWrapper<Long> wrapper) { + public AprServletOutputStream(SocketWrapper<Long> wrapper, + AprEndpoint endpoint) { + this.endpoint = endpoint; this.wrapper = wrapper; this.socket = wrapper.getSocket().longValue(); ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); @@ -79,12 +84,20 @@ public class AprServletOutputStream exte } } - if (result < 0) { - throw new IOException(sm.getString("apr.write.error", - Integer.valueOf(-result))); + if (result >= 0) { + if (result < len) { + endpoint.getPoller().add(socket, -1, false, true); + } + return result; + } + else if (-result == Status.EAGAIN) { + endpoint.getPoller().add(socket, -1, false, true); + return 0; } - return result; + throw new IOException(sm.getString("apr.write.error", + Integer.valueOf(-result))); + } @Override Modified: tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java?rev=1434500&r1=1434499&r2=1434500&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java Wed Jan 16 23:43:23 2013 @@ -1047,20 +1047,18 @@ public class AprEndpoint extends Abstrac // -------------------------------------------------- SocketInfo Inner Class public static class SocketInfo { - public static final int READ = 1; - public static final int WRITE = 2; public long socket; public int timeout; public int flags; public boolean read() { - return (flags & READ) == READ; + return (flags & Poll.APR_POLLIN) == Poll.APR_POLLIN; } public boolean write() { - return (flags & WRITE) == WRITE; + return (flags & Poll.APR_POLLOUT) == Poll.APR_POLLOUT; } public static int merge(int flag1, int flag2) { - return ((flag1 & READ) | (flag2 & READ)) - | ((flag1 & WRITE) | (flag2 & WRITE)); + return ((flag1 & Poll.APR_POLLIN) | (flag2 & Poll.APR_POLLIN)) + | ((flag1 & Poll.APR_POLLOUT) | (flag2 & Poll.APR_POLLOUT)); } @Override public String toString() { @@ -1386,7 +1384,7 @@ public class AprEndpoint extends Abstrac synchronized (this) { // Add socket to the list. Newly added sockets will wait // at most for pollTime before being polled - if (addList.add(socket, timeout, SocketInfo.READ)) { + if (addList.add(socket, timeout, Poll.APR_POLLIN)) { ok = true; this.notify(); } @@ -1417,6 +1415,12 @@ public class AprEndpoint extends Abstrac * @param write to do write polling */ public void add(long socket, int timeout, boolean read, boolean write) { + add(socket, timeout, + (read ? Poll.APR_POLLIN : 0) | + (write ? Poll.APR_POLLOUT : 0)); + } + + private void add(long socket, int timeout, int flags) { if (timeout < 0) { timeout = getSoTimeout(); } @@ -1428,9 +1432,7 @@ public class AprEndpoint extends Abstrac synchronized (this) { // Add socket to the list. Newly added sockets will wait // at most for pollTime before being polled - if (addList.add(socket, timeout, - (read ? SocketInfo.READ : 0) | - (write ? SocketInfo.WRITE : 0))) { + if (addList.add(socket, timeout, flags)) { ok = true; this.notify(); } @@ -1630,6 +1632,9 @@ public class AprEndpoint extends Abstrac AprSocketWrapper wrapper = connections.get( Long.valueOf(desc[n*2+1])); wrapper.pollerFlags = wrapper.pollerFlags & ~((int) desc[n*2]); + if (wrapper.pollerFlags != 0) { + add(desc[n*2+1], 1, wrapper.pollerFlags); + } // Check for failed sockets and hand this socket off to a worker if (wrapper.isComet()) { // Event processes either a read or a write depending on what the poller returns @@ -1665,8 +1670,18 @@ public class AprEndpoint extends Abstrac || ((desc[n*2] & Poll.APR_POLLNVAL) == Poll.APR_POLLNVAL)) { // Close socket and clear pool destroySocket(desc[n*2+1]); - } else if ((desc[n*2] & Poll.APR_POLLIN) == Poll.APR_POLLIN) { - if (!processSocket(desc[n*2+1], SocketStatus.OPEN_READ)) { + } else if (((desc[n*2] & Poll.APR_POLLIN) == Poll.APR_POLLIN) + || ((desc[n*2] & Poll.APR_POLLOUT) == Poll.APR_POLLOUT)) { + boolean error = false; + if (((desc[n*2] & Poll.APR_POLLIN) == Poll.APR_POLLIN) && + !processSocket(desc[n*2+1], SocketStatus.OPEN_READ)) { + error = true; + // Close socket and clear pool + destroySocket(desc[n*2+1]); + } + if (!error && + ((desc[n*2] & Poll.APR_POLLOUT) == Poll.APR_POLLOUT) && + !processSocket(desc[n*2+1], SocketStatus.OPEN_WRITE)) { // Close socket and clear pool destroySocket(desc[n*2+1]); } @@ -2148,25 +2163,37 @@ public class AprEndpoint extends Abstrac @Override public void run() { - synchronized (socket) { - // Process the request from this socket - SocketState state = handler.process(socket, status); - if (state == Handler.SocketState.CLOSED) { - // Close socket and pool - destroySocket(socket.getSocket().longValue()); - } else if (state == Handler.SocketState.LONG) { - socket.access(); - if (socket.async) { - waitingRequests.add(socket); - } - } else if (state == Handler.SocketState.ASYNC_END) { - socket.access(); - SocketProcessor proc = new SocketProcessor(socket, - SocketStatus.OPEN_READ); - getExecutor().execute(proc); + + // Upgraded connections need to allow multiple threads to access the + // connection at the same time to enable blocking IO to be used when + // Servlet 3.1 NIO has been configured + if (socket != null && socket.isUpgraded()) { + doRun(); + } else { + synchronized (socket) { + doRun(); } } } + + private void doRun() { + // Process the request from this socket + SocketState state = handler.process(socket, status); + if (state == Handler.SocketState.CLOSED) { + // Close socket and pool + destroySocket(socket.getSocket().longValue()); + } else if (state == Handler.SocketState.LONG) { + socket.access(); + if (socket.async) { + waitingRequests.add(socket); + } + } else if (state == Handler.SocketState.ASYNC_END) { + socket.access(); + SocketProcessor proc = new SocketProcessor(socket, + SocketStatus.OPEN_READ); + getExecutor().execute(proc); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org