Author: markt Date: Thu Aug 22 10:11:27 2013 New Revision: 1516407 URL: http://svn.apache.org/r1516407 Log: Back-port NIO endpoint changes required to support concurrent read/write for JSR-356 upgraded connections
Modified: tomcat/tc7.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Modified: tomcat/tc7.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=1516407&r1=1516406&r2=1516407&view=diff ============================================================================== --- tomcat/tc7.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original) +++ tomcat/tc7.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Thu Aug 22 10:11:27 2013 @@ -1260,7 +1260,7 @@ public class NioEndpoint extends Abstrac processSocket(channel, SocketStatus.DISCONNECT, true); } else { //future placement of a WRITE notif - if (!processSocket(channel, SocketStatus.OPEN_READ, true)) + if (!processSocket(channel, SocketStatus.OPEN_WRITE, true)) processSocket(channel, SocketStatus.DISCONNECT, true); } } else { @@ -1269,8 +1269,23 @@ public class NioEndpoint extends Abstrac } else { //later on, improve latch behavior if ( isWorkerAvailable() ) { - unreg(sk, attachment,sk.readyOps()); - boolean close = (!processSocket(channel, null, true)); + + boolean readAndWrite = sk.isReadable() && sk.isWritable(); + reg(sk, attachment, 0); + if (attachment.isAsync() && readAndWrite) { + //remember the that we want to know about write too + attachment.interestOps(SelectionKey.OP_WRITE); + } + //read goes before write + if (sk.isReadable()) { + //read notification + if (!processSocket(channel, SocketStatus.OPEN_READ, true)) + close = true; + } else { + //future placement of a WRITE notif + if (!processSocket(channel, SocketStatus.OPEN_WRITE, true)) + close = true; + } if (close) { cancelledKey(sk,SocketStatus.DISCONNECT,false); } @@ -1319,7 +1334,9 @@ public class NioEndpoint extends Abstrac cancelledKey(sk,SocketStatus.ERROR,false); return false; } - sd.fchannel = new FileInputStream(f).getChannel(); + @SuppressWarnings("resource") // Closed when channel is closed + FileInputStream fis = new FileInputStream(f); + sd.fchannel = fis.getChannel(); } //configure output channel @@ -1670,104 +1687,136 @@ public class NioEndpoint extends Abstrac @Override public void run() { + SelectionKey key = socket.getIOChannel().keyFor( + socket.getPoller().getSelector()); + KeyAttachment ka = null; + + if (key != null) { + ka = (KeyAttachment)key.attachment(); + } + + // Upgraded connections need to allow multiple threads to access the + // connection at the same time to enable blocking IO to be used when + // NIO has been configured + if (ka != null && ka.isUpgraded() && + SocketStatus.OPEN_WRITE == status) { + synchronized (ka.getWriteThreadLock()) { + doRun(key, ka); + } + } else { + synchronized (socket) { + doRun(key, ka); + } + } + } + + private void doRun(SelectionKey key, KeyAttachment ka) { boolean launch = false; - synchronized (socket) { - SelectionKey key = null; - try { - key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); - int handshake = -1; + try { + int handshake = -1; - try { - if (key!=null) handshake = socket.handshake(key.isReadable(), key.isWritable()); - }catch ( IOException x ) { - handshake = -1; - if ( log.isDebugEnabled() ) log.debug("Error during SSL handshake",x); - }catch ( CancelledKeyException ckx ) { - handshake = -1; - } - if ( handshake == 0 ) { - SocketState state = SocketState.OPEN; - // Process the request from this socket - if (status == null) { - state = handler.process( - (KeyAttachment) key.attachment(), - SocketStatus.OPEN_READ); + try { + if (key != null) { + // For STOP there is no point trying to handshake as the + // Poller has been stopped. + if (socket.isHandshakeComplete() || + status == SocketStatus.STOP) { + handshake = 0; } else { - state = handler.process( - (KeyAttachment) key.attachment(), - status); + handshake = socket.handshake( + key.isReadable(), key.isWritable()); + // The handshake process reads/writes from/to the + // socket. status may therefore be OPEN_WRITE once + // the handshake completes. However, the handshake + // happens when the socket is opened so the status + // must always be OPEN_READ after it completes. It + // is OK to always set this as it is only used if + // the handshake completes. + status = SocketStatus.OPEN_READ; } - - if (state == SocketState.CLOSED) { - // Close socket and pool - try { - KeyAttachment ka = null; - if (key!=null) { - ka = (KeyAttachment) key.attachment(); - if (ka!=null) ka.setComet(false); - socket.getPoller().cancelledKey(key, SocketStatus.ERROR, false); - } + } + }catch ( IOException x ) { + handshake = -1; + if ( log.isDebugEnabled() ) log.debug("Error during SSL handshake",x); + }catch ( CancelledKeyException ckx ) { + handshake = -1; + } + if ( handshake == 0 ) { + SocketState state = SocketState.OPEN; + // Process the request from this socket + if (status == null) { + state = handler.process(ka, SocketStatus.OPEN_READ); + } else { + state = handler.process(ka, status); + } + if (state == SocketState.CLOSED) { + // Close socket and pool + try { + if (ka!=null) ka.setComet(false); + socket.getPoller().cancelledKey(key, SocketStatus.ERROR, false); + if (running && !paused) { nioChannels.offer(socket); - socket = null; - if ( ka!=null ) keyCache.offer(ka); - ka = null; - }catch ( Exception x ) { - log.error("",x); } + socket = null; + if (running && !paused && ka!=null) { + keyCache.offer(ka); + } + ka = null; + } catch ( Exception x ) { + log.error("",x); } - } else if (handshake == -1 ) { - KeyAttachment ka = null; - if (key!=null) { - ka = (KeyAttachment) key.attachment(); - socket.getPoller().cancelledKey(key, SocketStatus.DISCONNECT, false); - } - nioChannels.offer(socket); - socket = null; - if ( ka!=null ) keyCache.offer(ka); - ka = null; - } else { - final SelectionKey fk = key; - final int intops = handshake; - final KeyAttachment ka = (KeyAttachment)fk.attachment(); - ka.getPoller().add(socket,intops); - } - }catch(CancelledKeyException cx) { - socket.getPoller().cancelledKey(key,null,false); - } catch (OutOfMemoryError oom) { + } + } else if (handshake == -1 ) { + if (key != null) { + socket.getPoller().cancelledKey(key, SocketStatus.DISCONNECT, false); + } + nioChannels.offer(socket); + socket = null; + if ( ka!=null ) keyCache.offer(ka); + ka = null; + } else { + ka.getPoller().add(socket, handshake); + } + }catch(CancelledKeyException cx) { + socket.getPoller().cancelledKey(key,null,false); + } catch (OutOfMemoryError oom) { + try { + oomParachuteData = null; + log.error("", oom); + if (socket != null) { + socket.getPoller().cancelledKey(key,SocketStatus.ERROR, false); + } + releaseCaches(); + }catch ( Throwable oomt ) { try { - oomParachuteData = null; - log.error("", oom); - if (socket != null) { - socket.getPoller().cancelledKey(key,SocketStatus.ERROR, false); - } - releaseCaches(); - }catch ( Throwable oomt ) { - try { - System.err.println(oomParachuteMsg); - oomt.printStackTrace(); - }catch (Throwable letsHopeWeDontGetHere){ - ExceptionUtils.handleThrowable(letsHopeWeDontGetHere); - } + System.err.println(oomParachuteMsg); + oomt.printStackTrace(); + }catch (Throwable letsHopeWeDontGetHere){ + ExceptionUtils.handleThrowable(letsHopeWeDontGetHere); } - } catch (VirtualMachineError vme) { - ExceptionUtils.handleThrowable(vme); - }catch ( Throwable t ) { - log.error("",t); + } + } catch (VirtualMachineError vme) { + ExceptionUtils.handleThrowable(vme); + }catch ( Throwable t ) { + log.error("",t); + if (socket != null) { socket.getPoller().cancelledKey(key,SocketStatus.ERROR,false); - } finally { - if (launch) { - try { - getExecutor().execute(new SocketProcessor(socket, SocketStatus.OPEN_READ)); - } catch (NullPointerException npe) { - if (running) { - log.error(sm.getString("endpoint.launch.fail"), - npe); - } + } + } finally { + if (launch) { + try { + getExecutor().execute(new SocketProcessor(socket, SocketStatus.OPEN_READ)); + } catch (NullPointerException npe) { + if (running) { + log.error(sm.getString("endpoint.launch.fail"), + npe); } } - socket = null; - status = null; - //return to cache + } + socket = null; + status = null; + //return to cache + if (running && !paused) { processorCache.offer(this); } } --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org