Author: fhanik Date: Tue Mar 13 14:16:47 2007 New Revision: 517873 URL: http://svn.apache.org/viewvc?view=rev&rev=517873 Log: cleaned up code a little bit, still haven't figured out a good fairness algorithm
Modified: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Modified: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?view=diff&rev=517873&r1=517872&r2=517873 ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original) +++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Tue Mar 13 14:16:47 2007 @@ -1290,58 +1290,12 @@ // any active event. while (iterator != null && iterator.hasNext()) { SelectionKey sk = (SelectionKey) iterator.next(); - iterator.remove(); KeyAttachment attachment = (KeyAttachment)sk.attachment(); - try { - if ( sk.isValid() && attachment != null ) { - attachment.access();//make sure we don't time out valid sockets - sk.attach(attachment);//cant remember why this is here - int interestOps = sk.interestOps();//get the interestops, in case we need to reset them - sk.interestOps(0); //this is a must, so that we don't have multiple threads messing with the socket - attachment.interestOps(0);//fast access interestp ops - NioChannel channel = attachment.getChannel(); - if (sk.isReadable() || sk.isWritable() ) { - if ( attachment.getComet() ) { - //check if thread is available - if ( isWorkerAvailable() ) { - if (!processSocket(channel, SocketStatus.OPEN)) - processSocket(channel, SocketStatus.DISCONNECT); - } else { - //increase the fairness counter - attachment.incFairness(); - //reregister it - attachment.interestOps(interestOps); - sk.interestOps(interestOps); - } - } else if ( attachment.getLatch() != null ) { - attachment.getLatch().countDown(); - } else { - //later on, improve latch behavior - if ( isWorkerAvailable() ) { - boolean close = (!processSocket(channel)); - if (close) { - channel.close(); - channel.getIOChannel().socket().close(); - } - } else { - //increase the fairness counter - attachment.incFairness(); - //reregister it - attachment.interestOps(interestOps); - sk.interestOps(interestOps); - } - } - } - } else { - //invalid key - cancelledKey(sk, SocketStatus.ERROR); - } - } catch ( CancelledKeyException ckx ) { - cancelledKey(sk, SocketStatus.ERROR); - } catch (Throwable t) { - log.error("",t); - } + if ( processKey(sk, attachment) ) { + iterator.remove(); //only remove it if the key was processed. + } }//while + //process timeouts timeout(keyCount,hasEvents); }//while @@ -1350,6 +1304,64 @@ } } + + protected boolean processKey(SelectionKey sk, KeyAttachment attachment) { + boolean result = true; + try { + if ( sk.isValid() && attachment != null ) { + attachment.access();//make sure we don't time out valid sockets + sk.attach(attachment);//cant remember why this is here + NioChannel channel = attachment.getChannel(); + if (sk.isReadable() || sk.isWritable() ) { + if ( attachment.getComet() ) { + //check if thread is available + if ( isWorkerAvailable() ) { + unreg(sk, attachment); + if (!processSocket(channel, SocketStatus.OPEN)) + processSocket(channel, SocketStatus.DISCONNECT); + attachment.setFairness(0); + } else { + //increase the fairness counter + attachment.incFairness(); + result = false; + } + } else if ( attachment.getLatch() != null ) { + unreg(sk, attachment); + attachment.getLatch().countDown(); + } else { + //later on, improve latch behavior + if ( isWorkerAvailable() ) { + unreg(sk, attachment); + boolean close = (!processSocket(channel)); + if (close) { + channel.close(); + channel.getIOChannel().socket().close(); + } + attachment.setFairness(0); + } else { + //increase the fairness counter + attachment.incFairness(); + result = false; + } + } + } + } else { + //invalid key + cancelledKey(sk, SocketStatus.ERROR); + } + } catch ( CancelledKeyException ckx ) { + cancelledKey(sk, SocketStatus.ERROR); + } catch (Throwable t) { + log.error("",t); + } + return result; + } + + protected void unreg(SelectionKey sk, KeyAttachment attachment) { + sk.interestOps(0); //this is a must, so that we don't have multiple threads messing with the socket + attachment.interestOps(0);//fast access interestp ops + } + protected void timeout(int keyCount, boolean hasEvents) { long now = System.currentTimeMillis(); //don't process timeouts too frequently, but if the selector simply timed out @@ -1453,13 +1465,27 @@ protected long lastRegistered = 0; } // ----------------------------------------------------- Key Fairness Comparator - public static class KeyFairnessComparator implements Comparator<KeyAttachment> { - public int compare(KeyAttachment ka1, KeyAttachment ka2) { + public static class KeyFairnessComparator implements Comparator<SelectionKey> { + public int compare(SelectionKey ska1, SelectionKey ska2) { + KeyAttachment ka1 = (KeyAttachment)ska1.attachment(); + KeyAttachment ka2 = (KeyAttachment)ska2.attachment(); + if ( ka1 == null && ka2 == null ) return 0; + if ( ka1 == null ) return 1; //invalid keys go last + if ( ka2 == null ) return -1; //invalid keys go last long lr1 = ka1.getLastRegistered(); long lr2 = ka2.getLastRegistered(); int f1 = ka1.getFairness(); int f2 = ka2.getFairness(); - if ( f1 == f2 ) { + CountDownLatch lat1 = ka1.getLatch(); + CountDownLatch lat2 = ka2.getLatch(); + if ( lat1 != null && lat2 != null ) { + return 0; + } else if ( lat1 != null && lat2 == null ) { + //latches have highest priority + return -1; + } else if ( lat1 == null && lat2 != null ) { + return 1; + } else if ( f1 == f2 ) { if ( lr1 == lr2 ) return 0; //earlier objects have priorioty else return lr1<lr2?-1:1; @@ -1471,8 +1497,8 @@ return ka1.getFairness()>ka2.getFairness()?-1:1; } } - } + --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]