On 24/12/2012 11:17, ma...@apache.org wrote: > Author: markt > Date: Mon Dec 24 11:17:51 2012 > New Revision: 1425633 > > URL: http://svn.apache.org/viewvc?rev=1425633&view=rev > Log: > Allow multiple threads to process an upgraded connection at the same time. > This is required to support blocking reads/writes. The > Servlet[Input|Output]Streams do not expose an API for a blocking > [read|write]. Therefore to block a thread has to wait until > [onDataAvailable()|onWritePossible()] is called. The problem is that the > waiting thread holds a lock on the socket and there is no way through the > Servlet 3.1 API to release that lock. Until the lock is released the thread > that will eventually call [onDataAvailable()|onWritePossible()] is blocked. > So a form of deadlock occurs. To overcome this without requiring libraries > such as WebSocket implementations to access container specific APIs, Tomcat > has to allow multiple threads to process a upgraded connection at the same > time.
While this worked on Windows XP, it doesn't work on Linux which - from memory of running the Autobahn tests on the Tomcat 7.0.x web socket implementation - has a much better (i.e. faster) network implementation. The issue is that the Servlet 3.1 non-blocking proposal requires multiple container managed threads to be using the socket at the same time. I think this is limited to two threads (one reading, one writing) but the specification does not explicitly state that so there may be a requirement for more. I will be starting a discussion in the EG on this point. Regardless, prior to this commit Tomcat enforced a one socket - one thread limit and that limit will need to be removed in Tomcat 8. Some form of re-factoring will be required but before I embark any further down that road I want to take some time to review and fully understand the existing code. There are various parts that would benefit from some additional commentary and I suspect that a little clean-up will be possible as well. Mark > > Modified: > tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java > > Modified: tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java > URL: > http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=1425633&r1=1425632&r2=1425633&view=diff > ============================================================================== > --- tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original) > +++ tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Mon Dec 24 > 11:17:51 2012 > @@ -1535,116 +1535,126 @@ public class NioEndpoint extends Abstrac > this.status = status; > } > > - @SuppressWarnings("null") // key > @Override > public void run() { > + SelectionKey key = socket.getIOChannel().keyFor( > + socket.getPoller().getSelector()); > + KeyAttachment ka = null; > + > + if (key != null) { > + ka = (KeyAttachment)key.attachment(); > + } > + > + // Upgraded connections need to allow multiple threads to access > the > + // connection at the same time to enable blocking IO to be used > when > + // NIO has been configured > + if (ka != null && ka.isUpgraded()) { > + doRun(key, ka); > + } else { > + synchronized (socket) { > + doRun(key, ka); > + } > + } > + } > + > + private void doRun(SelectionKey key, KeyAttachment ka) { > boolean launch = false; > - synchronized (socket) { > - SelectionKey key = null; > - try { > - key = > socket.getIOChannel().keyFor(socket.getPoller().getSelector()); > - int handshake = -1; > + try { > + int handshake = -1; > > - try { > - if (key!=null) handshake = > socket.handshake(key.isReadable(), key.isWritable()); > - }catch ( IOException x ) { > - handshake = -1; > - if ( log.isDebugEnabled() ) log.debug("Error during > SSL handshake",x); > - }catch ( CancelledKeyException ckx ) { > - handshake = -1; > - } > - if ( handshake == 0 ) { > - SocketState state = SocketState.OPEN; > - // Process the request from this socket > - // Suppress null warnings for key in this block since > - // key can't be null in this block > - KeyAttachment ka = (KeyAttachment)key.attachment(); > - if (status == null) { > - state = handler.process(ka, > SocketStatus.OPEN_READ); > - } else { > - state = handler.process(ka, status); > - } > - if (state == SocketState.CLOSED) { > - // Close socket and pool > - try { > - if (ka!=null) ka.setComet(false); > - socket.getPoller().cancelledKey(key, > SocketStatus.ERROR); > - if (running && !paused) { > - nioChannels.push(socket); > - } > - socket = null; > - if (running && !paused && ka != null) { > - keyCache.push(ka); > - } > - ka = null; > - }catch ( Exception x ) { > - log.error("",x); > - } > - } else if (state == SocketState.LONG && ka != null > && ka.isAsync() && ka.interestOps() > 0) { > - //we are async, and we are interested in > operations > - ka.getPoller().add(socket, ka.interestOps()); > - } > - } else if (handshake == -1 ) { > - KeyAttachment ka = null; > - if (key!=null) { > - ka = (KeyAttachment) key.attachment(); > - socket.getPoller().cancelledKey(key, > SocketStatus.DISCONNECT); > - } > - if (running && !paused) { > - nioChannels.push(socket); > - } > - socket = null; > - if (running && !paused && ka != null) { > - keyCache.push(ka); > - } > - ka = null; > + try { > + if (key!=null) handshake = > socket.handshake(key.isReadable(), key.isWritable()); > + }catch ( IOException x ) { > + handshake = -1; > + if ( log.isDebugEnabled() ) log.debug("Error during SSL > handshake",x); > + }catch ( CancelledKeyException ckx ) { > + handshake = -1; > + } > + if ( handshake == 0 ) { > + SocketState state = SocketState.OPEN; > + // Process the request from this socket > + // Suppress null warnings for key in this block since > + // key can't be null in this block > + if (status == null) { > + state = handler.process(ka, SocketStatus.OPEN_READ); > } else { > - final SelectionKey fk = key; > - final int intops = handshake; > - final KeyAttachment ka = > (KeyAttachment)fk.attachment(); > - ka.getPoller().add(socket,intops); > - } > - }catch(CancelledKeyException cx) { > - socket.getPoller().cancelledKey(key,null); > - } catch (OutOfMemoryError oom) { > - try { > - oomParachuteData = null; > - log.error("", oom); > - if (socket != null) { > - > socket.getPoller().cancelledKey(key,SocketStatus.ERROR); > - } > - releaseCaches(); > - }catch ( Throwable oomt ) { > + state = handler.process(ka, status); > + } > + if (state == SocketState.CLOSED) { > + // Close socket and pool > try { > - System.err.println(oomParachuteMsg); > - oomt.printStackTrace(); > - }catch (Throwable letsHopeWeDontGetHere){ > - > ExceptionUtils.handleThrowable(letsHopeWeDontGetHere); > - } > + if (ka!=null) ka.setComet(false); > + socket.getPoller().cancelledKey(key, > SocketStatus.ERROR); > + if (running && !paused) { > + nioChannels.push(socket); > + } > + socket = null; > + if (running && !paused && ka != null) { > + keyCache.push(ka); > + } > + ka = null; > + }catch ( Exception x ) { > + log.error("",x); > + } > + } else if (state == SocketState.LONG && ka != null && > ka.isAsync() && ka.interestOps() > 0) { > + //we are async, and we are interested in operations > + ka.getPoller().add(socket, ka.interestOps()); > + } > + } else if (handshake == -1 ) { > + if (key != null) { > + socket.getPoller().cancelledKey(key, > SocketStatus.DISCONNECT); > + } > + if (running && !paused) { > + nioChannels.push(socket); > } > - }catch ( Throwable t ) { > - log.error("",t); > + socket = null; > + if (running && !paused && ka != null) { > + keyCache.push(ka); > + } > + ka = null; > + } else { > + ka.getPoller().add(socket,handshake); > + } > + }catch(CancelledKeyException cx) { > + socket.getPoller().cancelledKey(key,null); > + } catch (OutOfMemoryError oom) { > + try { > + oomParachuteData = null; > + log.error("", oom); > if (socket != null) { > > socket.getPoller().cancelledKey(key,SocketStatus.ERROR); > } > - } finally { > - if (launch) { > - try { > - getExecutor().execute(new > SocketProcessor(socket, SocketStatus.OPEN_READ)); > - } catch (NullPointerException npe) { > - if (running) { > - > log.error(sm.getString("endpoint.launch.fail"), > - npe); > - } > - } > + releaseCaches(); > + }catch ( Throwable oomt ) { > + try { > + System.err.println(oomParachuteMsg); > + oomt.printStackTrace(); > + }catch (Throwable letsHopeWeDontGetHere){ > + > ExceptionUtils.handleThrowable(letsHopeWeDontGetHere); > } > - socket = null; > - status = null; > - //return to cache > - if (running && !paused) { > - processorCache.push(this); > + } > + }catch ( Throwable t ) { > + log.error("",t); > + if (socket != null) { > + socket.getPoller().cancelledKey(key,SocketStatus.ERROR); > + } > + } finally { > + if (launch) { > + try { > + getExecutor().execute(new SocketProcessor(socket, > SocketStatus.OPEN_READ)); > + } catch (NullPointerException npe) { > + if (running) { > + log.error(sm.getString("endpoint.launch.fail"), > + npe); > + } > } > } > + socket = null; > + status = null; > + //return to cache > + if (running && !paused) { > + processorCache.push(this); > + } > } > } > } > > > > --------------------------------------------------------------------- > To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org > For additional commands, e-mail: dev-h...@tomcat.apache.org > --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org