Author: fhanik Date: Wed Feb 21 08:37:04 2007 New Revision: 510092 URL: http://svn.apache.org/viewvc?view=rev&rev=510092 Log: Fixed latch behavior, still could be improved upon. In the next revision, I'll probably have the blocking read/write selector to use its own thread so that it doesn't content with the poller thread
Modified: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioSelectorPool.java Modified: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java?view=diff&rev=510092&r1=510091&r2=510092 ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java (original) +++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java Wed Feb 21 08:37:04 2007 @@ -65,14 +65,21 @@ KeyAttachment att = (KeyAttachment) key.attachment(); try { - att.startLatch(1); - socket.getPoller().add(socket,SelectionKey.OP_WRITE); + if ( att.getLatch()==null || att.getLatch().getCount()==0) att.startLatch(1); + if ( att.interestOps() == 0) socket.getPoller().add(socket,SelectionKey.OP_WRITE); att.getLatch().await(writeTimeout,TimeUnit.MILLISECONDS); - att.resetLatch(); }catch (InterruptedException ignore) { + Thread.interrupted(); + } + if ( att.getLatch()!=null && att.getLatch().getCount()> 0) { + //we got interrupted, but we haven't received notification from the poller. + keycount = 0; + }else { + //latch countdown has happened + keycount = 1; + att.resetLatch(); } - if ( att.getLatch() == null ) keycount = 1; - else keycount = 0; + if (writeTimeout > 0 && (keycount == 0)) timedout = (System.currentTimeMillis() - time) >= writeTimeout; } //while @@ -122,14 +129,20 @@ } KeyAttachment att = (KeyAttachment) key.attachment(); try { - att.startLatch(1); - socket.getPoller().add(socket,SelectionKey.OP_READ); + if ( att.getLatch()==null || att.getLatch().getCount()==0) att.startLatch(1); + if ( att.interestOps() == 0) socket.getPoller().add(socket,SelectionKey.OP_READ); att.getLatch().await(readTimeout,TimeUnit.MILLISECONDS); - att.resetLatch(); }catch (InterruptedException ignore) { + Thread.interrupted(); + } + if ( att.getLatch()!=null && att.getLatch().getCount()> 0) { + //we got interrupted, but we haven't received notification from the poller. + keycount = 0; + }else { + //latch countdown has happened + keycount = 1; + att.resetLatch(); } - if ( att.getLatch() == null ) keycount = 1; - else keycount = 0; if (readTimeout > 0 && (keycount == 0)) timedout = (System.currentTimeMillis() - time) >= readTimeout; } //while Modified: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?view=diff&rev=510092&r1=510091&r2=510092 ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original) +++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Wed Feb 21 08:37:04 2007 @@ -593,7 +593,7 @@ serverSock = ServerSocketChannel.open(); InetSocketAddress addr = (address!=null?new InetSocketAddress(address,port):new InetSocketAddress(port)); - serverSock.socket().bind(addr,100); //todo, set backlog value + serverSock.socket().bind(addr,backlog); serverSock.configureBlocking(true); //mimic APR behavior // Initialize thread count defaults for acceptor, poller and sendfile @@ -852,6 +852,24 @@ /** + * Returns true if a worker thread is available for processing. + * @return boolean + */ + protected boolean isWorkerAvailable() { + if (workers.size() > 0) { + return true; + } + if ((maxThreads > 0) && (curThreads < maxThreads)) { + return true; + } else { + if (maxThreads < 0) { + return true; + } else { + return false; + } + } + } + /** * Create (or allocate) and return an available processor for use in * processing a specific HTTP request, if possible. If the maximum * allowed processors have already been created and are in use, return @@ -1013,6 +1031,8 @@ // Accept the next incoming connection from the server socket SocketChannel socket = serverSock.accept(); // Hand this socket off to an appropriate processor + //TODO FIXME - this is currently a blocking call, meaning we will be blocking + //further accepts until there is a thread available. if ( running && (!paused) && socket != null ) processSocket(socket); } catch (Throwable t) { log.error(sm.getString("endpoint.accept.fail"), t); @@ -1260,23 +1280,35 @@ if ( sk.isValid() && attachment != null ) { attachment.access(); sk.attach(attachment); + int interestOps = sk.interestOps(); sk.interestOps(0); //this is a must, so that we don't have multiple threads messing with the socket attachment.interestOps(0); NioChannel channel = attachment.getChannel(); if (sk.isReadable() || sk.isWritable() ) { if ( attachment.getComet() ) { - if (!processSocket(channel, SocketStatus.OPEN)) - processSocket(channel, SocketStatus.DISCONNECT); + //check if thread is available + if ( isWorkerAvailable() ) { + if (!processSocket(channel, SocketStatus.OPEN)) + processSocket(channel, SocketStatus.DISCONNECT); + } else { + //reregister it + attachment.interestOps(interestOps); + sk.interestOps(interestOps); + } } else if ( attachment.getLatch() != null ) { attachment.getLatch().countDown(); } else { - //this sucker here dead locks with the count down latch - //since this call is blocking if no threads are available. - //TODO: FIXME BIG TIME - boolean close = (!processSocket(channel)); - if ( close ) { - channel.close(); - channel.getIOChannel().socket().close(); + //later on, improve latch behavior + if ( isWorkerAvailable() ) { + boolean close = (!processSocket(channel)); + if (close) { + channel.close(); + channel.getIOChannel().socket().close(); + } + } else { + //reregister it + attachment.interestOps(interestOps); + sk.interestOps(interestOps); } } } Modified: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioSelectorPool.java URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioSelectorPool.java?view=diff&rev=510092&r1=510091&r2=510092 ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioSelectorPool.java (original) +++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioSelectorPool.java Wed Feb 21 08:37:04 2007 @@ -41,23 +41,25 @@ protected final static boolean SHARED = Boolean.valueOf(System.getProperty("org.apache.tomcat.util.net.NioSelectorShared", "true")).booleanValue(); protected static Selector SHARED_SELECTOR; + + protected int maxSelectors = 200; + protected int maxSpareSelectors = -1; + protected boolean enabled = true; + protected AtomicInteger active = new AtomicInteger(0); + protected AtomicInteger spare = new AtomicInteger(0); + protected ConcurrentLinkedQueue<Selector> selectors = new ConcurrentLinkedQueue<Selector>(); + protected static Selector getSharedSelector() throws IOException { if (SHARED && SHARED_SELECTOR == null) { synchronized ( NioSelectorPool.class ) { if ( SHARED_SELECTOR == null ) { - SHARED_SELECTOR = Selector.open(); + SHARED_SELECTOR = Selector.open(); log.info("Using a shared selector for servlet write/read"); - } + } } } return SHARED_SELECTOR; } - protected int maxSelectors = 200; - protected int maxSpareSelectors = -1; - protected boolean enabled = true; - protected AtomicInteger active = new AtomicInteger(0); - protected AtomicInteger spare = new AtomicInteger(0); - protected ConcurrentLinkedQueue<Selector> selectors = new ConcurrentLinkedQueue<Selector>(); public Selector get() throws IOException{ if ( SHARED ) { --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]