Author: markt
Date: Wed Jan 16 23:43:23 2013
New Revision: 1434500
URL: http://svn.apache.org/viewvc?rev=1434500&view=rev
Log:
Final parts of the puzzle to get APR/native to support the JSR356 WebSocket.
There are a small number of Autobahn failures still. I'll look at those next.
Modified:
tomcat/trunk/java/org/apache/coyote/http11/Http11AprProtocol.java
tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprProcessor.java
tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprServletOutputStream.java
tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
Modified: tomcat/trunk/java/org/apache/coyote/http11/Http11AprProtocol.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/Http11AprProtocol.java?rev=1434500&r1=1434499&r2=1434500&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/Http11AprProtocol.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/Http11AprProtocol.java Wed Jan
16 23:43:23 2013
@@ -340,7 +340,8 @@ public class Http11AprProtocol extends A
SocketWrapper<Long> socket,
ProtocolHandler httpUpgradeProcessor)
throws IOException {
- return new AprProcessor(socket, httpUpgradeProcessor);
+ return new AprProcessor(socket, httpUpgradeProcessor,
+ (AprEndpoint) proto.endpoint);
}
}
}
Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprProcessor.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprProcessor.java?rev=1434500&r1=1434499&r2=1434500&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprProcessor.java
(original)
+++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprProcessor.java Wed
Jan 16 23:43:23 2013
@@ -19,6 +19,7 @@ package org.apache.coyote.http11.upgrade
import javax.servlet.http.ProtocolHandler;
import org.apache.tomcat.jni.Socket;
+import org.apache.tomcat.util.net.AprEndpoint;
import org.apache.tomcat.util.net.SocketWrapper;
public class AprProcessor extends AbstractProcessor<Long> {
@@ -26,10 +27,10 @@ public class AprProcessor extends Abstra
private static final int INFINITE_TIMEOUT = -1;
public AprProcessor(SocketWrapper<Long> wrapper,
- ProtocolHandler httpUpgradeProcessor) {
+ ProtocolHandler httpUpgradeProcessor, AprEndpoint endpoint) {
super(httpUpgradeProcessor,
new AprServletInputStream(wrapper),
- new AprServletOutputStream(wrapper));
+ new AprServletOutputStream(wrapper, endpoint));
Socket.timeoutSet(wrapper.getSocket().longValue(), INFINITE_TIMEOUT);
}
Modified:
tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprServletOutputStream.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprServletOutputStream.java?rev=1434500&r1=1434499&r2=1434500&view=diff
==============================================================================
---
tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprServletOutputStream.java
(original)
+++
tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprServletOutputStream.java
Wed Jan 16 23:43:23 2013
@@ -22,16 +22,21 @@ import java.util.concurrent.locks.Reentr
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.tomcat.jni.Socket;
+import org.apache.tomcat.jni.Status;
+import org.apache.tomcat.util.net.AprEndpoint;
import org.apache.tomcat.util.net.SocketWrapper;
public class AprServletOutputStream extends AbstractServletOutputStream {
+ private final AprEndpoint endpoint;
private final SocketWrapper<Long> wrapper;
private final long socket;
private final Lock blockingStatusReadLock;
private final WriteLock blockingStatusWriteLock;
- public AprServletOutputStream(SocketWrapper<Long> wrapper) {
+ public AprServletOutputStream(SocketWrapper<Long> wrapper,
+ AprEndpoint endpoint) {
+ this.endpoint = endpoint;
this.wrapper = wrapper;
this.socket = wrapper.getSocket().longValue();
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@@ -79,12 +84,20 @@ public class AprServletOutputStream exte
}
}
- if (result < 0) {
- throw new IOException(sm.getString("apr.write.error",
- Integer.valueOf(-result)));
+ if (result >= 0) {
+ if (result < len) {
+ endpoint.getPoller().add(socket, -1, false, true);
+ }
+ return result;
+ }
+ else if (-result == Status.EAGAIN) {
+ endpoint.getPoller().add(socket, -1, false, true);
+ return 0;
}
- return result;
+ throw new IOException(sm.getString("apr.write.error",
+ Integer.valueOf(-result)));
+
}
@Override
Modified: tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java?rev=1434500&r1=1434499&r2=1434500&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java Wed Jan 16
23:43:23 2013
@@ -1047,20 +1047,18 @@ public class AprEndpoint extends Abstrac
// -------------------------------------------------- SocketInfo Inner
Class
public static class SocketInfo {
- public static final int READ = 1;
- public static final int WRITE = 2;
public long socket;
public int timeout;
public int flags;
public boolean read() {
- return (flags & READ) == READ;
+ return (flags & Poll.APR_POLLIN) == Poll.APR_POLLIN;
}
public boolean write() {
- return (flags & WRITE) == WRITE;
+ return (flags & Poll.APR_POLLOUT) == Poll.APR_POLLOUT;
}
public static int merge(int flag1, int flag2) {
- return ((flag1 & READ) | (flag2 & READ))
- | ((flag1 & WRITE) | (flag2 & WRITE));
+ return ((flag1 & Poll.APR_POLLIN) | (flag2 & Poll.APR_POLLIN))
+ | ((flag1 & Poll.APR_POLLOUT) | (flag2 & Poll.APR_POLLOUT));
}
@Override
public String toString() {
@@ -1386,7 +1384,7 @@ public class AprEndpoint extends Abstrac
synchronized (this) {
// Add socket to the list. Newly added sockets will wait
// at most for pollTime before being polled
- if (addList.add(socket, timeout, SocketInfo.READ)) {
+ if (addList.add(socket, timeout, Poll.APR_POLLIN)) {
ok = true;
this.notify();
}
@@ -1417,6 +1415,12 @@ public class AprEndpoint extends Abstrac
* @param write to do write polling
*/
public void add(long socket, int timeout, boolean read, boolean write)
{
+ add(socket, timeout,
+ (read ? Poll.APR_POLLIN : 0) |
+ (write ? Poll.APR_POLLOUT : 0));
+ }
+
+ private void add(long socket, int timeout, int flags) {
if (timeout < 0) {
timeout = getSoTimeout();
}
@@ -1428,9 +1432,7 @@ public class AprEndpoint extends Abstrac
synchronized (this) {
// Add socket to the list. Newly added sockets will wait
// at most for pollTime before being polled
- if (addList.add(socket, timeout,
- (read ? SocketInfo.READ : 0) |
- (write ? SocketInfo.WRITE : 0))) {
+ if (addList.add(socket, timeout, flags)) {
ok = true;
this.notify();
}
@@ -1630,6 +1632,9 @@ public class AprEndpoint extends Abstrac
AprSocketWrapper wrapper = connections.get(
Long.valueOf(desc[n*2+1]));
wrapper.pollerFlags = wrapper.pollerFlags &
~((int) desc[n*2]);
+ if (wrapper.pollerFlags != 0) {
+ add(desc[n*2+1], 1, wrapper.pollerFlags);
+ }
// Check for failed sockets and hand this
socket off to a worker
if (wrapper.isComet()) {
// Event processes either a read or a
write depending on what the poller returns
@@ -1665,8 +1670,18 @@ public class AprEndpoint extends Abstrac
|| ((desc[n*2] & Poll.APR_POLLNVAL) ==
Poll.APR_POLLNVAL)) {
// Close socket and clear pool
destroySocket(desc[n*2+1]);
- } else if ((desc[n*2] & Poll.APR_POLLIN) ==
Poll.APR_POLLIN) {
- if (!processSocket(desc[n*2+1],
SocketStatus.OPEN_READ)) {
+ } else if (((desc[n*2] & Poll.APR_POLLIN) ==
Poll.APR_POLLIN)
+ || ((desc[n*2] & Poll.APR_POLLOUT) ==
Poll.APR_POLLOUT)) {
+ boolean error = false;
+ if (((desc[n*2] & Poll.APR_POLLIN) ==
Poll.APR_POLLIN) &&
+ !processSocket(desc[n*2+1],
SocketStatus.OPEN_READ)) {
+ error = true;
+ // Close socket and clear pool
+ destroySocket(desc[n*2+1]);
+ }
+ if (!error &&
+ ((desc[n*2] & Poll.APR_POLLOUT) ==
Poll.APR_POLLOUT) &&
+ !processSocket(desc[n*2+1],
SocketStatus.OPEN_WRITE)) {
// Close socket and clear pool
destroySocket(desc[n*2+1]);
}
@@ -2148,25 +2163,37 @@ public class AprEndpoint extends Abstrac
@Override
public void run() {
- synchronized (socket) {
- // Process the request from this socket
- SocketState state = handler.process(socket, status);
- if (state == Handler.SocketState.CLOSED) {
- // Close socket and pool
- destroySocket(socket.getSocket().longValue());
- } else if (state == Handler.SocketState.LONG) {
- socket.access();
- if (socket.async) {
- waitingRequests.add(socket);
- }
- } else if (state == Handler.SocketState.ASYNC_END) {
- socket.access();
- SocketProcessor proc = new SocketProcessor(socket,
- SocketStatus.OPEN_READ);
- getExecutor().execute(proc);
+
+ // Upgraded connections need to allow multiple threads to access
the
+ // connection at the same time to enable blocking IO to be used
when
+ // Servlet 3.1 NIO has been configured
+ if (socket != null && socket.isUpgraded()) {
+ doRun();
+ } else {
+ synchronized (socket) {
+ doRun();
}
}
}
+
+ private void doRun() {
+ // Process the request from this socket
+ SocketState state = handler.process(socket, status);
+ if (state == Handler.SocketState.CLOSED) {
+ // Close socket and pool
+ destroySocket(socket.getSocket().longValue());
+ } else if (state == Handler.SocketState.LONG) {
+ socket.access();
+ if (socket.async) {
+ waitingRequests.add(socket);
+ }
+ } else if (state == Handler.SocketState.ASYNC_END) {
+ socket.access();
+ SocketProcessor proc = new SocketProcessor(socket,
+ SocketStatus.OPEN_READ);
+ getExecutor().execute(proc);
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]