Author: fhanik Date: Wed May 9 09:46:06 2007 New Revision: 536580 URL: http://svn.apache.org/viewvc?view=rev&rev=536580 Log: Separate out read vs write latches, simplify implementation, avoid concurrency issues and prepare for new comet strategies
Modified: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Modified: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java?view=diff&rev=536580&r1=536579&r2=536580 ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java (original) +++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java Wed May 9 09:46:06 2007 @@ -66,20 +66,20 @@ if ( key == null ) throw new IOException("Key no longer registered"); KeyAttachment att = (KeyAttachment) key.attachment(); try { - if ( att.getLatch()==null || att.getLatch().getCount()==0) att.startLatch(1,SelectionKey.OP_WRITE); + if ( att.getWriteLatch()==null || att.getWriteLatch().getCount()==0) att.startWriteLatch(1); //only register for write if a write has not yet been issued if ( (att.interestOps() & SelectionKey.OP_WRITE) == 0) socket.getPoller().add(socket,SelectionKey.OP_WRITE); - att.awaitLatch(writeTimeout,TimeUnit.MILLISECONDS,SelectionKey.OP_WRITE); + att.awaitWriteLatch(writeTimeout,TimeUnit.MILLISECONDS); }catch (InterruptedException ignore) { Thread.interrupted(); } - if ( att.getLatch()!=null && att.getLatch().getCount()> 0) { + if ( att.getWriteLatch()!=null && att.getWriteLatch().getCount()> 0) { //we got interrupted, but we haven't received notification from the poller. keycount = 0; }else { //latch countdown has happened keycount = 1; - att.resetLatch(); + att.resetWriteLatch(); } if (writeTimeout > 0 && (keycount == 0)) @@ -135,19 +135,19 @@ } KeyAttachment att = (KeyAttachment) key.attachment(); try { - if ( att.getLatch()==null || att.getLatch().getCount()==0) att.startLatch(1,SelectionKey.OP_READ); + if ( att.getReadLatch()==null || att.getReadLatch().getCount()==0) att.startReadLatch(1); if ( att.interestOps() == 0) socket.getPoller().add(socket,SelectionKey.OP_READ); - att.awaitLatch(readTimeout,TimeUnit.MILLISECONDS, SelectionKey.OP_READ); + att.awaitReadLatch(readTimeout,TimeUnit.MILLISECONDS); }catch (InterruptedException ignore) { Thread.interrupted(); } - if ( att.getLatch()!=null && att.getLatch().getCount()> 0) { + if ( att.getReadLatch()!=null && att.getReadLatch().getCount()> 0) { //we got interrupted, but we haven't received notification from the poller. keycount = 0; }else { //latch countdown has happened keycount = 1; - att.resetLatch(); + att.resetReadLatch(); } if (readTimeout > 0 && (keycount == 0)) timedout = (System.currentTimeMillis() - time) >= readTimeout; Modified: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?view=diff&rev=536580&r1=536579&r2=536580 ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original) +++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Wed May 9 09:46:06 2007 @@ -1495,9 +1495,12 @@ sk.attach(attachment);//cant remember why this is here NioChannel channel = attachment.getChannel(); if (sk.isReadable() || sk.isWritable() ) { - if ( attachment.getLatch() != null ) { - unreg(sk, attachment,attachment.getLatchOps()); - attachment.getLatch().countDown(); + if ( sk.isReadable() && attachment.getReadLatch() != null ) { + unreg(sk, attachment,SelectionKey.OP_READ); + attachment.getReadLatch().countDown(); + } else if ( sk.isWritable() && attachment.getWriteLatch() != null ) { + unreg(sk, attachment,SelectionKey.OP_WRITE); + attachment.getWriteLatch().countDown(); } else if ( attachment.getSendfileData() != null ) { processSendfile(sk,attachment,true); } else if ( attachment.getComet() ) { @@ -1650,9 +1653,10 @@ fairness = 0; lastRegistered = 0; sendfileData = null; - if ( latch!=null ) try {latch.countDown();}catch (Exception ignore){} - latch = null; - latchOps = 0; + if ( readLatch!=null ) try {for (int i=0; i<(int)readLatch.getCount();i++) readLatch.countDown();}catch (Exception ignore){} + readLatch = null; + if ( writeLatch!=null ) try {for (int i=0; i<(int)writeLatch.getCount();i++) writeLatch.countDown();}catch (Exception ignore){} + writeLatch = null; } public void reset() { @@ -1679,25 +1683,32 @@ protected int interestOps = 0; public int interestOps() { return interestOps;} public int interestOps(int ops) { this.interestOps = ops; return ops; } - public CountDownLatch getLatch() { return latch; } - public void resetLatch() { - if ( latch.getCount() == 0 ) latch = null; + public CountDownLatch getReadLatch() { return readLatch; } + public CountDownLatch getWriteLatch() { return writeLatch; } + protected CountDownLatch resetLatch(CountDownLatch latch) { + if ( latch.getCount() == 0 ) return null; else throw new IllegalStateException("Latch must be at count 0"); - latchOps = 0; } - public void startLatch(int cnt, int latchOps) { + public void resetReadLatch() { readLatch = resetLatch(readLatch); } + public void resetWriteLatch() { writeLatch = resetLatch(writeLatch); } + + protected CountDownLatch startLatch(CountDownLatch latch, int cnt) { if ( latch == null || latch.getCount() == 0 ) { - this.latch = new CountDownLatch(cnt); - this.latchOps = latchOps; + return new CountDownLatch(cnt); } else throw new IllegalStateException("Latch must be at count 0 or null."); } - public void awaitLatch(long timeout, TimeUnit unit, int latchOps) throws InterruptedException { + public void startReadLatch(int cnt) { readLatch = startLatch(readLatch,cnt);} + public void startWriteLatch(int cnt) { writeLatch = startLatch(writeLatch,cnt);} + + + protected void awaitLatch(CountDownLatch latch, long timeout, TimeUnit unit) throws InterruptedException { if ( latch == null ) throw new IllegalStateException("Latch cannot be null"); - this.latchOps = this.latchOps | latchOps; latch.await(timeout,unit); } - public int getLatchOps() { return latchOps;} + public void awaitReadLatch(long timeout, TimeUnit unit) throws InterruptedException { awaitLatch(readLatch,timeout,unit);} + public void awaitWriteLatch(long timeout, TimeUnit unit) throws InterruptedException { awaitLatch(writeLatch,timeout,unit);} + public int getFairness() { return fairness; } public void setFairness(int f) { fairness = f;} public void incFairness() { fairness++; } @@ -1714,50 +1725,12 @@ protected long timeout = -1; protected boolean error = false; protected NioChannel channel = null; - protected CountDownLatch latch = null; - protected int latchOps = 0; + protected CountDownLatch readLatch = null; + protected CountDownLatch writeLatch = null; protected int fairness = 0; protected long lastRegistered = 0; protected SendfileData sendfileData = null; } -// ----------------------------------------------------- Key Fairness Comparator - public static class KeyFairnessComparator implements Comparator<SelectionKey> { - public int compare(SelectionKey ska1, SelectionKey ska2) { - KeyAttachment ka1 = (KeyAttachment)ska1.attachment(); - KeyAttachment ka2 = (KeyAttachment)ska2.attachment(); - if ( ka1 == null && ka2 == null ) return 0; - if ( ka1 == null ) return 1; //invalid keys go last - if ( ka2 == null ) return -1; //invalid keys go last - long lr1 = ka1.getLastRegistered(); - long lr2 = ka2.getLastRegistered(); - int f1 = ka1.getFairness(); - int f2 = ka2.getFairness(); - CountDownLatch lat1 = ka1.getLatch(); - CountDownLatch lat2 = ka2.getLatch(); - if ( lat1 != null && lat2 != null ) { - return 0; - } else if ( lat1 != null && lat2 == null ) { - //latches have highest priority - return -1; - } else if ( lat1 == null && lat2 != null ) { - return 1; - } else if ( f1 == f2 ) { - if ( lr1 == lr2 ) return 0; - //earlier objects have priorioty - else return lr1<lr2?-1:1; - } else { - //higher fairness means earlier in the queue - //as fairness count means how many times the poller has skipped - //this socket, and the socket has been ready, there just hasn't - //been any worker thread available to handle it - return ka1.getFairness()>ka2.getFairness()?-1:1; - } - } - } - - - - // ----------------------------------------------------- Worker Inner Class --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]