Author: markt Date: Mon Dec 24 11:17:51 2012 New Revision: 1425633 URL: http://svn.apache.org/viewvc?rev=1425633&view=rev Log: Allow multiple threads to process an upgraded connection at the same time. This is required to support blocking reads/writes. The Servlet[Input|Output]Streams do not expose an API for a blocking [read|write]. Therefore to block a thread has to wait until [onDataAvailable()|onWritePossible()] is called. The problem is that the waiting thread holds a lock on the socket and there is no way through the Servlet 3.1 API to release that lock. Until the lock is released the thread that will eventually call [onDataAvailable()|onWritePossible()] is blocked. So a form of deadlock occurs. To overcome this without requiring libraries such as WebSocket implementations to access container specific APIs, Tomcat has to allow multiple threads to process a upgraded connection at the same time.
Modified: tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Modified: tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=1425633&r1=1425632&r2=1425633&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Mon Dec 24 11:17:51 2012 @@ -1535,116 +1535,126 @@ public class NioEndpoint extends Abstrac this.status = status; } - @SuppressWarnings("null") // key @Override public void run() { + SelectionKey key = socket.getIOChannel().keyFor( + socket.getPoller().getSelector()); + KeyAttachment ka = null; + + if (key != null) { + ka = (KeyAttachment)key.attachment(); + } + + // Upgraded connections need to allow multiple threads to access the + // connection at the same time to enable blocking IO to be used when + // NIO has been configured + if (ka != null && ka.isUpgraded()) { + doRun(key, ka); + } else { + synchronized (socket) { + doRun(key, ka); + } + } + } + + private void doRun(SelectionKey key, KeyAttachment ka) { boolean launch = false; - synchronized (socket) { - SelectionKey key = null; - try { - key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); - int handshake = -1; + try { + int handshake = -1; - try { - if (key!=null) handshake = socket.handshake(key.isReadable(), key.isWritable()); - }catch ( IOException x ) { - handshake = -1; - if ( log.isDebugEnabled() ) log.debug("Error during SSL handshake",x); - }catch ( CancelledKeyException ckx ) { - handshake = -1; - } - if ( handshake == 0 ) { - SocketState state = SocketState.OPEN; - // Process the request from this socket - // Suppress null warnings for key in this block since - // key can't be null in this block - KeyAttachment ka = (KeyAttachment)key.attachment(); - if (status == null) { - state = handler.process(ka, SocketStatus.OPEN_READ); - } else { - state = handler.process(ka, status); - } - if (state == SocketState.CLOSED) { - // Close socket and pool - try { - if (ka!=null) ka.setComet(false); - socket.getPoller().cancelledKey(key, SocketStatus.ERROR); - if (running && !paused) { - nioChannels.push(socket); - } - socket = null; - if (running && !paused && ka != null) { - keyCache.push(ka); - } - ka = null; - }catch ( Exception x ) { - log.error("",x); - } - } else if (state == SocketState.LONG && ka != null && ka.isAsync() && ka.interestOps() > 0) { - //we are async, and we are interested in operations - ka.getPoller().add(socket, ka.interestOps()); - } - } else if (handshake == -1 ) { - KeyAttachment ka = null; - if (key!=null) { - ka = (KeyAttachment) key.attachment(); - socket.getPoller().cancelledKey(key, SocketStatus.DISCONNECT); - } - if (running && !paused) { - nioChannels.push(socket); - } - socket = null; - if (running && !paused && ka != null) { - keyCache.push(ka); - } - ka = null; + try { + if (key!=null) handshake = socket.handshake(key.isReadable(), key.isWritable()); + }catch ( IOException x ) { + handshake = -1; + if ( log.isDebugEnabled() ) log.debug("Error during SSL handshake",x); + }catch ( CancelledKeyException ckx ) { + handshake = -1; + } + if ( handshake == 0 ) { + SocketState state = SocketState.OPEN; + // Process the request from this socket + // Suppress null warnings for key in this block since + // key can't be null in this block + if (status == null) { + state = handler.process(ka, SocketStatus.OPEN_READ); } else { - final SelectionKey fk = key; - final int intops = handshake; - final KeyAttachment ka = (KeyAttachment)fk.attachment(); - ka.getPoller().add(socket,intops); - } - }catch(CancelledKeyException cx) { - socket.getPoller().cancelledKey(key,null); - } catch (OutOfMemoryError oom) { - try { - oomParachuteData = null; - log.error("", oom); - if (socket != null) { - socket.getPoller().cancelledKey(key,SocketStatus.ERROR); - } - releaseCaches(); - }catch ( Throwable oomt ) { + state = handler.process(ka, status); + } + if (state == SocketState.CLOSED) { + // Close socket and pool try { - System.err.println(oomParachuteMsg); - oomt.printStackTrace(); - }catch (Throwable letsHopeWeDontGetHere){ - ExceptionUtils.handleThrowable(letsHopeWeDontGetHere); - } + if (ka!=null) ka.setComet(false); + socket.getPoller().cancelledKey(key, SocketStatus.ERROR); + if (running && !paused) { + nioChannels.push(socket); + } + socket = null; + if (running && !paused && ka != null) { + keyCache.push(ka); + } + ka = null; + }catch ( Exception x ) { + log.error("",x); + } + } else if (state == SocketState.LONG && ka != null && ka.isAsync() && ka.interestOps() > 0) { + //we are async, and we are interested in operations + ka.getPoller().add(socket, ka.interestOps()); + } + } else if (handshake == -1 ) { + if (key != null) { + socket.getPoller().cancelledKey(key, SocketStatus.DISCONNECT); + } + if (running && !paused) { + nioChannels.push(socket); } - }catch ( Throwable t ) { - log.error("",t); + socket = null; + if (running && !paused && ka != null) { + keyCache.push(ka); + } + ka = null; + } else { + ka.getPoller().add(socket,handshake); + } + }catch(CancelledKeyException cx) { + socket.getPoller().cancelledKey(key,null); + } catch (OutOfMemoryError oom) { + try { + oomParachuteData = null; + log.error("", oom); if (socket != null) { socket.getPoller().cancelledKey(key,SocketStatus.ERROR); } - } finally { - if (launch) { - try { - getExecutor().execute(new SocketProcessor(socket, SocketStatus.OPEN_READ)); - } catch (NullPointerException npe) { - if (running) { - log.error(sm.getString("endpoint.launch.fail"), - npe); - } - } + releaseCaches(); + }catch ( Throwable oomt ) { + try { + System.err.println(oomParachuteMsg); + oomt.printStackTrace(); + }catch (Throwable letsHopeWeDontGetHere){ + ExceptionUtils.handleThrowable(letsHopeWeDontGetHere); } - socket = null; - status = null; - //return to cache - if (running && !paused) { - processorCache.push(this); + } + }catch ( Throwable t ) { + log.error("",t); + if (socket != null) { + socket.getPoller().cancelledKey(key,SocketStatus.ERROR); + } + } finally { + if (launch) { + try { + getExecutor().execute(new SocketProcessor(socket, SocketStatus.OPEN_READ)); + } catch (NullPointerException npe) { + if (running) { + log.error(sm.getString("endpoint.launch.fail"), + npe); + } } } + socket = null; + status = null; + //return to cache + if (running && !paused) { + processorCache.push(this); + } } } } --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org