Author: fhanik Date: Thu Nov 20 08:13:02 2008 New Revision: 719264 URL: http://svn.apache.org/viewvc?rev=719264&view=rev Log: Fixed read/write timeouts - backport of http://svn.apache.org/viewvc?view=rev&revision=707670
Modified: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java Modified: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java?rev=719264&r1=719263&r2=719264&view=diff ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java (original) +++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java Thu Nov 20 08:13:02 2008 @@ -81,6 +81,7 @@ public int write(ByteBuffer buf, NioChannel socket, long writeTimeout,MutableInteger lastWrite) throws IOException { SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); if ( key == null ) throw new IOException("Key no longer registered"); + KeyReference reference = new KeyReference(); KeyAttachment att = (KeyAttachment) key.attachment(); int written = 0; boolean timedout = false; @@ -101,7 +102,7 @@ } try { if ( att.getWriteLatch()==null || att.getWriteLatch().getCount()==0) att.startWriteLatch(1); - poller.add(att,SelectionKey.OP_WRITE); + poller.add(att,SelectionKey.OP_WRITE,reference); att.awaitWriteLatch(writeTimeout,TimeUnit.MILLISECONDS); }catch (InterruptedException ignore) { Thread.interrupted(); @@ -122,9 +123,10 @@ throw new SocketTimeoutException(); } finally { poller.remove(att,SelectionKey.OP_WRITE); - if (timedout && key != null) { - poller.cancelKey(socket, key); + if (timedout && reference.key!=null) { + poller.cancelKey(reference.key); } + reference.key = null; } return written; } @@ -145,6 +147,7 @@ public int read(ByteBuffer buf, NioChannel socket, long readTimeout) throws IOException { SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); if ( key == null ) throw new IOException("Key no longer registered"); + KeyReference reference = new KeyReference(); KeyAttachment att = (KeyAttachment) key.attachment(); int read = 0; boolean timedout = false; @@ -162,7 +165,7 @@ } try { if ( att.getReadLatch()==null || att.getReadLatch().getCount()==0) att.startReadLatch(1); - poller.add(att,SelectionKey.OP_READ); + poller.add(att,SelectionKey.OP_READ, reference); att.awaitReadLatch(readTimeout,TimeUnit.MILLISECONDS); }catch (InterruptedException ignore) { Thread.interrupted(); @@ -182,9 +185,10 @@ throw new SocketTimeoutException(); } finally { poller.remove(att,SelectionKey.OP_READ); - if (timedout && key != null) { - poller.cancelKey(socket,key); + if (timedout && reference.key!=null) { + poller.cancelKey(reference.key); } + reference.key = null; } return read; } @@ -193,10 +197,10 @@ protected class BlockPoller extends Thread { protected boolean run = true; protected Selector selector = null; - protected ConcurrentLinkedQueue events = new ConcurrentLinkedQueue(); + protected ConcurrentLinkedQueue<Runnable> events = new ConcurrentLinkedQueue<Runnable>(); public void disable() { run = false; selector.wakeup();} protected AtomicInteger wakeupCounter = new AtomicInteger(0); - public void cancelKey(final NioChannel socket, final SelectionKey key) { + public void cancelKey(final SelectionKey key) { Runnable r = new Runnable() { public void run() { key.cancel(); @@ -219,7 +223,7 @@ } } - public void add(final KeyAttachment key, final int ops) { + public void add(final KeyAttachment key, final int ops, final KeyReference ref) { Runnable r = new Runnable() { public void run() { if ( key == null ) return; @@ -231,6 +235,9 @@ try { if (sk == null) { sk = ch.register(selector, ops, key); + ref.key = sk; + } else if (!sk.isValid()) { + cancel(sk,key,ops); } else { sk.interestOps(sk.interestOps() | ops); } @@ -259,10 +266,15 @@ if (SelectionKey.OP_WRITE==(ops&SelectionKey.OP_WRITE)) countDown(key.getWriteLatch()); if (SelectionKey.OP_READ==(ops&SelectionKey.OP_READ))countDown(key.getReadLatch()); } else { - sk.interestOps(sk.interestOps() & (~ops)); - if (SelectionKey.OP_WRITE==(ops&SelectionKey.OP_WRITE)) countDown(key.getWriteLatch()); - if (SelectionKey.OP_READ==(ops&SelectionKey.OP_READ))countDown(key.getReadLatch()); - if (sk.interestOps()==0) { + if (sk.isValid()) { + sk.interestOps(sk.interestOps() & (~ops)); + if (SelectionKey.OP_WRITE==(ops&SelectionKey.OP_WRITE)) countDown(key.getWriteLatch()); + if (SelectionKey.OP_READ==(ops&SelectionKey.OP_READ))countDown(key.getReadLatch()); + if (sk.interestOps()==0) { + sk.cancel(); + sk.attach(null); + } + }else { sk.cancel(); sk.attach(null); } @@ -284,7 +296,7 @@ boolean result = false; Runnable r = null; result = (events.size() > 0); - while ( (r = (Runnable)events.poll()) != null ) { + while ( (r = events.poll()) != null ) { r.run(); result = true; } @@ -320,12 +332,12 @@ continue; } - Iterator iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null; + Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null; // Walk through the collection of ready keys and dispatch // any active event. while (run && iterator != null && iterator.hasNext()) { - SelectionKey sk = (SelectionKey) iterator.next(); + SelectionKey sk = iterator.next(); KeyAttachment attachment = (KeyAttachment)sk.attachment(); try { attachment.access(); @@ -353,15 +365,30 @@ }catch( Exception ignore ) { if (log.isDebugEnabled())log.debug("",ignore); } + try { + selector.close();//Close the connector + }catch( Exception ignore ) { + if (log.isDebugEnabled())log.debug("",ignore); + } } public void countDown(CountDownLatch latch) { if ( latch == null ) return; latch.countDown(); } + } + + public class KeyReference { + SelectionKey key = null; - - + @Override + public void finalize() { + if (key!=null && key.isValid()) { + log.warn("Possible key leak, cancelling key in the finalizer."); + try {key.cancel();}catch (Exception ignore){} + } + key = null; + } } } \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]