Author: markt
Date: Mon Dec 24 11:17:51 2012
New Revision: 1425633
URL: http://svn.apache.org/viewvc?rev=1425633&view=rev
Log:
Allow multiple threads to process an upgraded connection at the same time.
This is required to support blocking reads/writes. The
Servlet[Input|Output]Streams do not expose an API for a blocking [read|write].
Therefore to block a thread has to wait until
[onDataAvailable()|onWritePossible()] is called. The problem is that the
waiting thread holds a lock on the socket and there is no way through the
Servlet 3.1 API to release that lock. Until the lock is released the thread
that will eventually call [onDataAvailable()|onWritePossible()] is blocked. So
a form of deadlock occurs. To overcome this without requiring libraries such as
WebSocket implementations to access container specific APIs, Tomcat has to
allow multiple threads to process a upgraded connection at the same time.
Modified:
tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
Modified: tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=1425633&r1=1425632&r2=1425633&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Mon Dec 24
11:17:51 2012
@@ -1535,116 +1535,126 @@ public class NioEndpoint extends Abstrac
this.status = status;
}
- @SuppressWarnings("null") // key
@Override
public void run() {
+ SelectionKey key = socket.getIOChannel().keyFor(
+ socket.getPoller().getSelector());
+ KeyAttachment ka = null;
+
+ if (key != null) {
+ ka = (KeyAttachment)key.attachment();
+ }
+
+ // Upgraded connections need to allow multiple threads to access
the
+ // connection at the same time to enable blocking IO to be used
when
+ // NIO has been configured
+ if (ka != null && ka.isUpgraded()) {
+ doRun(key, ka);
+ } else {
+ synchronized (socket) {
+ doRun(key, ka);
+ }
+ }
+ }
+
+ private void doRun(SelectionKey key, KeyAttachment ka) {
boolean launch = false;
- synchronized (socket) {
- SelectionKey key = null;
- try {
- key =
socket.getIOChannel().keyFor(socket.getPoller().getSelector());
- int handshake = -1;
+ try {
+ int handshake = -1;
- try {
- if (key!=null) handshake =
socket.handshake(key.isReadable(), key.isWritable());
- }catch ( IOException x ) {
- handshake = -1;
- if ( log.isDebugEnabled() ) log.debug("Error during
SSL handshake",x);
- }catch ( CancelledKeyException ckx ) {
- handshake = -1;
- }
- if ( handshake == 0 ) {
- SocketState state = SocketState.OPEN;
- // Process the request from this socket
- // Suppress null warnings for key in this block since
- // key can't be null in this block
- KeyAttachment ka = (KeyAttachment)key.attachment();
- if (status == null) {
- state = handler.process(ka,
SocketStatus.OPEN_READ);
- } else {
- state = handler.process(ka, status);
- }
- if (state == SocketState.CLOSED) {
- // Close socket and pool
- try {
- if (ka!=null) ka.setComet(false);
- socket.getPoller().cancelledKey(key,
SocketStatus.ERROR);
- if (running && !paused) {
- nioChannels.push(socket);
- }
- socket = null;
- if (running && !paused && ka != null) {
- keyCache.push(ka);
- }
- ka = null;
- }catch ( Exception x ) {
- log.error("",x);
- }
- } else if (state == SocketState.LONG && ka != null &&
ka.isAsync() && ka.interestOps() > 0) {
- //we are async, and we are interested in operations
- ka.getPoller().add(socket, ka.interestOps());
- }
- } else if (handshake == -1 ) {
- KeyAttachment ka = null;
- if (key!=null) {
- ka = (KeyAttachment) key.attachment();
- socket.getPoller().cancelledKey(key,
SocketStatus.DISCONNECT);
- }
- if (running && !paused) {
- nioChannels.push(socket);
- }
- socket = null;
- if (running && !paused && ka != null) {
- keyCache.push(ka);
- }
- ka = null;
+ try {
+ if (key!=null) handshake =
socket.handshake(key.isReadable(), key.isWritable());
+ }catch ( IOException x ) {
+ handshake = -1;
+ if ( log.isDebugEnabled() ) log.debug("Error during SSL
handshake",x);
+ }catch ( CancelledKeyException ckx ) {
+ handshake = -1;
+ }
+ if ( handshake == 0 ) {
+ SocketState state = SocketState.OPEN;
+ // Process the request from this socket
+ // Suppress null warnings for key in this block since
+ // key can't be null in this block
+ if (status == null) {
+ state = handler.process(ka, SocketStatus.OPEN_READ);
} else {
- final SelectionKey fk = key;
- final int intops = handshake;
- final KeyAttachment ka =
(KeyAttachment)fk.attachment();
- ka.getPoller().add(socket,intops);
- }
- }catch(CancelledKeyException cx) {
- socket.getPoller().cancelledKey(key,null);
- } catch (OutOfMemoryError oom) {
- try {
- oomParachuteData = null;
- log.error("", oom);
- if (socket != null) {
-
socket.getPoller().cancelledKey(key,SocketStatus.ERROR);
- }
- releaseCaches();
- }catch ( Throwable oomt ) {
+ state = handler.process(ka, status);
+ }
+ if (state == SocketState.CLOSED) {
+ // Close socket and pool
try {
- System.err.println(oomParachuteMsg);
- oomt.printStackTrace();
- }catch (Throwable letsHopeWeDontGetHere){
-
ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
- }
+ if (ka!=null) ka.setComet(false);
+ socket.getPoller().cancelledKey(key,
SocketStatus.ERROR);
+ if (running && !paused) {
+ nioChannels.push(socket);
+ }
+ socket = null;
+ if (running && !paused && ka != null) {
+ keyCache.push(ka);
+ }
+ ka = null;
+ }catch ( Exception x ) {
+ log.error("",x);
+ }
+ } else if (state == SocketState.LONG && ka != null &&
ka.isAsync() && ka.interestOps() > 0) {
+ //we are async, and we are interested in operations
+ ka.getPoller().add(socket, ka.interestOps());
+ }
+ } else if (handshake == -1 ) {
+ if (key != null) {
+ socket.getPoller().cancelledKey(key,
SocketStatus.DISCONNECT);
+ }
+ if (running && !paused) {
+ nioChannels.push(socket);
}
- }catch ( Throwable t ) {
- log.error("",t);
+ socket = null;
+ if (running && !paused && ka != null) {
+ keyCache.push(ka);
+ }
+ ka = null;
+ } else {
+ ka.getPoller().add(socket,handshake);
+ }
+ }catch(CancelledKeyException cx) {
+ socket.getPoller().cancelledKey(key,null);
+ } catch (OutOfMemoryError oom) {
+ try {
+ oomParachuteData = null;
+ log.error("", oom);
if (socket != null) {
socket.getPoller().cancelledKey(key,SocketStatus.ERROR);
}
- } finally {
- if (launch) {
- try {
- getExecutor().execute(new SocketProcessor(socket,
SocketStatus.OPEN_READ));
- } catch (NullPointerException npe) {
- if (running) {
- log.error(sm.getString("endpoint.launch.fail"),
- npe);
- }
- }
+ releaseCaches();
+ }catch ( Throwable oomt ) {
+ try {
+ System.err.println(oomParachuteMsg);
+ oomt.printStackTrace();
+ }catch (Throwable letsHopeWeDontGetHere){
+ ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
}
- socket = null;
- status = null;
- //return to cache
- if (running && !paused) {
- processorCache.push(this);
+ }
+ }catch ( Throwable t ) {
+ log.error("",t);
+ if (socket != null) {
+ socket.getPoller().cancelledKey(key,SocketStatus.ERROR);
+ }
+ } finally {
+ if (launch) {
+ try {
+ getExecutor().execute(new SocketProcessor(socket,
SocketStatus.OPEN_READ));
+ } catch (NullPointerException npe) {
+ if (running) {
+ log.error(sm.getString("endpoint.launch.fail"),
+ npe);
+ }
}
}
+ socket = null;
+ status = null;
+ //return to cache
+ if (running && !paused) {
+ processorCache.push(this);
+ }
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]