Author: fhanik
Date: Wed May 9 09:46:06 2007
New Revision: 536580
URL: http://svn.apache.org/viewvc?view=rev&rev=536580
Log:
Separate out read vs write latches, simplify implementation, avoid concurrency
issues and prepare for new comet strategies
Modified:
tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java
tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
Modified:
tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java
URL:
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java?view=diff&rev=536580&r1=536579&r2=536580
==============================================================================
---
tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java
(original)
+++
tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java
Wed May 9 09:46:06 2007
@@ -66,20 +66,20 @@
if ( key == null ) throw new IOException("Key no longer
registered");
KeyAttachment att = (KeyAttachment) key.attachment();
try {
- if ( att.getLatch()==null || att.getLatch().getCount()==0)
att.startLatch(1,SelectionKey.OP_WRITE);
+ if ( att.getWriteLatch()==null ||
att.getWriteLatch().getCount()==0) att.startWriteLatch(1);
//only register for write if a write has not yet been
issued
if ( (att.interestOps() & SelectionKey.OP_WRITE) == 0)
socket.getPoller().add(socket,SelectionKey.OP_WRITE);
-
att.awaitLatch(writeTimeout,TimeUnit.MILLISECONDS,SelectionKey.OP_WRITE);
+ att.awaitWriteLatch(writeTimeout,TimeUnit.MILLISECONDS);
}catch (InterruptedException ignore) {
Thread.interrupted();
}
- if ( att.getLatch()!=null && att.getLatch().getCount()> 0) {
+ if ( att.getWriteLatch()!=null &&
att.getWriteLatch().getCount()> 0) {
//we got interrupted, but we haven't received notification
from the poller.
keycount = 0;
}else {
//latch countdown has happened
keycount = 1;
- att.resetLatch();
+ att.resetWriteLatch();
}
if (writeTimeout > 0 && (keycount == 0))
@@ -135,19 +135,19 @@
}
KeyAttachment att = (KeyAttachment) key.attachment();
try {
- if ( att.getLatch()==null || att.getLatch().getCount()==0)
att.startLatch(1,SelectionKey.OP_READ);
+ if ( att.getReadLatch()==null ||
att.getReadLatch().getCount()==0) att.startReadLatch(1);
if ( att.interestOps() == 0)
socket.getPoller().add(socket,SelectionKey.OP_READ);
- att.awaitLatch(readTimeout,TimeUnit.MILLISECONDS,
SelectionKey.OP_READ);
+ att.awaitReadLatch(readTimeout,TimeUnit.MILLISECONDS);
}catch (InterruptedException ignore) {
Thread.interrupted();
}
- if ( att.getLatch()!=null && att.getLatch().getCount()> 0) {
+ if ( att.getReadLatch()!=null &&
att.getReadLatch().getCount()> 0) {
//we got interrupted, but we haven't received notification
from the poller.
keycount = 0;
}else {
//latch countdown has happened
keycount = 1;
- att.resetLatch();
+ att.resetReadLatch();
}
if (readTimeout > 0 && (keycount == 0))
timedout = (System.currentTimeMillis() - time) >=
readTimeout;
Modified: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
URL:
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?view=diff&rev=536580&r1=536579&r2=536580
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
(original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Wed
May 9 09:46:06 2007
@@ -1495,9 +1495,12 @@
sk.attach(attachment);//cant remember why this is here
NioChannel channel = attachment.getChannel();
if (sk.isReadable() || sk.isWritable() ) {
- if ( attachment.getLatch() != null ) {
- unreg(sk, attachment,attachment.getLatchOps());
- attachment.getLatch().countDown();
+ if ( sk.isReadable() && attachment.getReadLatch() !=
null ) {
+ unreg(sk, attachment,SelectionKey.OP_READ);
+ attachment.getReadLatch().countDown();
+ } else if ( sk.isWritable() &&
attachment.getWriteLatch() != null ) {
+ unreg(sk, attachment,SelectionKey.OP_WRITE);
+ attachment.getWriteLatch().countDown();
} else if ( attachment.getSendfileData() != null ) {
processSendfile(sk,attachment,true);
} else if ( attachment.getComet() ) {
@@ -1650,9 +1653,10 @@
fairness = 0;
lastRegistered = 0;
sendfileData = null;
- if ( latch!=null ) try {latch.countDown();}catch (Exception
ignore){}
- latch = null;
- latchOps = 0;
+ if ( readLatch!=null ) try {for (int i=0;
i<(int)readLatch.getCount();i++) readLatch.countDown();}catch (Exception
ignore){}
+ readLatch = null;
+ if ( writeLatch!=null ) try {for (int i=0;
i<(int)writeLatch.getCount();i++) writeLatch.countDown();}catch (Exception
ignore){}
+ writeLatch = null;
}
public void reset() {
@@ -1679,25 +1683,32 @@
protected int interestOps = 0;
public int interestOps() { return interestOps;}
public int interestOps(int ops) { this.interestOps = ops; return ops;
}
- public CountDownLatch getLatch() { return latch; }
- public void resetLatch() {
- if ( latch.getCount() == 0 ) latch = null;
+ public CountDownLatch getReadLatch() { return readLatch; }
+ public CountDownLatch getWriteLatch() { return writeLatch; }
+ protected CountDownLatch resetLatch(CountDownLatch latch) {
+ if ( latch.getCount() == 0 ) return null;
else throw new IllegalStateException("Latch must be at count 0");
- latchOps = 0;
}
- public void startLatch(int cnt, int latchOps) {
+ public void resetReadLatch() { readLatch = resetLatch(readLatch); }
+ public void resetWriteLatch() { writeLatch = resetLatch(writeLatch); }
+
+ protected CountDownLatch startLatch(CountDownLatch latch, int cnt) {
if ( latch == null || latch.getCount() == 0 ) {
- this.latch = new CountDownLatch(cnt);
- this.latchOps = latchOps;
+ return new CountDownLatch(cnt);
}
else throw new IllegalStateException("Latch must be at count 0 or
null.");
}
- public void awaitLatch(long timeout, TimeUnit unit, int latchOps)
throws InterruptedException {
+ public void startReadLatch(int cnt) { readLatch =
startLatch(readLatch,cnt);}
+ public void startWriteLatch(int cnt) { writeLatch =
startLatch(writeLatch,cnt);}
+
+
+ protected void awaitLatch(CountDownLatch latch, long timeout, TimeUnit
unit) throws InterruptedException {
if ( latch == null ) throw new IllegalStateException("Latch cannot
be null");
- this.latchOps = this.latchOps | latchOps;
latch.await(timeout,unit);
}
- public int getLatchOps() { return latchOps;}
+ public void awaitReadLatch(long timeout, TimeUnit unit) throws
InterruptedException { awaitLatch(readLatch,timeout,unit);}
+ public void awaitWriteLatch(long timeout, TimeUnit unit) throws
InterruptedException { awaitLatch(writeLatch,timeout,unit);}
+
public int getFairness() { return fairness; }
public void setFairness(int f) { fairness = f;}
public void incFairness() { fairness++; }
@@ -1714,50 +1725,12 @@
protected long timeout = -1;
protected boolean error = false;
protected NioChannel channel = null;
- protected CountDownLatch latch = null;
- protected int latchOps = 0;
+ protected CountDownLatch readLatch = null;
+ protected CountDownLatch writeLatch = null;
protected int fairness = 0;
protected long lastRegistered = 0;
protected SendfileData sendfileData = null;
}
-// ----------------------------------------------------- Key Fairness
Comparator
- public static class KeyFairnessComparator implements
Comparator<SelectionKey> {
- public int compare(SelectionKey ska1, SelectionKey ska2) {
- KeyAttachment ka1 = (KeyAttachment)ska1.attachment();
- KeyAttachment ka2 = (KeyAttachment)ska2.attachment();
- if ( ka1 == null && ka2 == null ) return 0;
- if ( ka1 == null ) return 1; //invalid keys go last
- if ( ka2 == null ) return -1; //invalid keys go last
- long lr1 = ka1.getLastRegistered();
- long lr2 = ka2.getLastRegistered();
- int f1 = ka1.getFairness();
- int f2 = ka2.getFairness();
- CountDownLatch lat1 = ka1.getLatch();
- CountDownLatch lat2 = ka2.getLatch();
- if ( lat1 != null && lat2 != null ) {
- return 0;
- } else if ( lat1 != null && lat2 == null ) {
- //latches have highest priority
- return -1;
- } else if ( lat1 == null && lat2 != null ) {
- return 1;
- } else if ( f1 == f2 ) {
- if ( lr1 == lr2 ) return 0;
- //earlier objects have priorioty
- else return lr1<lr2?-1:1;
- } else {
- //higher fairness means earlier in the queue
- //as fairness count means how many times the poller has
skipped
- //this socket, and the socket has been ready, there just
hasn't
- //been any worker thread available to handle it
- return ka1.getFairness()>ka2.getFairness()?-1:1;
- }
- }
- }
-
-
-
-
// ----------------------------------------------------- Worker Inner Class
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]