Author: fhanik
Date: Thu May 31 12:32:33 2007
New Revision: 543226

URL: http://svn.apache.org/viewvc?view=rev&rev=543226
Log:
1. Timeouts are now per connection, not using fixed timeouts anywhere. by 
default the connection gets the timeout defined in server.xml
2. Implemented all Comet operations, including the ability to have none
3. Implemented CometEvent.isReadable and isWriteable
   isAvailable - means data is available to the servlet
   isReadable - means there is data from the socket also checks the socket, by 
doing a read, in a non blocking fashion to verify this to be true
   isWriteable - the last write attempted on this socket was 0, hence we are 
probably blocking
4. simplified CometEvent.register/unregister, they are now just one call and no 
syncs
5. After each event, the connection is registered with the same operations it 
had before
6. CoyoteAdapter respects when the servlet doesn't want to be notified of the 
READ event, hence it doesn't invoke it automatically
7. Let me know if MutableBoolean and MutableInteger should be elsewhere(in 
terms of package), they are used since ActionHook doesn't have a return value 
and also valuable in the output buffers since SSL writing is two steps, one 
through the engine and the other to the socket
I'm pretty happy with how isReadable,isWriteable works, they are completly non 
blocking and very accurate
True non blocking in the buffers and filters seems like a major surgery, still 
holding off on that.
Need to fix the NioBlockingSelector as it is almost impossible to make the 
poller interest declaration thread safe


Added:
    tomcat/trunk/java/org/apache/tomcat/util/MutableBoolean.java
    tomcat/trunk/java/org/apache/tomcat/util/MutableInteger.java
Modified:
    tomcat/trunk/java/org/apache/catalina/connector/CometEventImpl.java
    tomcat/trunk/java/org/apache/catalina/connector/CoyoteAdapter.java
    tomcat/trunk/java/org/apache/catalina/connector/OutputBuffer.java
    tomcat/trunk/java/org/apache/catalina/connector/Request.java
    tomcat/trunk/java/org/apache/catalina/connector/Response.java
    tomcat/trunk/java/org/apache/coyote/ActionCode.java
    tomcat/trunk/java/org/apache/coyote/Response.java
    tomcat/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java
    tomcat/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java
    tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java
    tomcat/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java
    tomcat/trunk/java/org/apache/tomcat/util/net/NioChannel.java
    tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
    tomcat/trunk/java/org/apache/tomcat/util/net/NioSelectorPool.java
    tomcat/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java

Modified: tomcat/trunk/java/org/apache/catalina/connector/CometEventImpl.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/connector/CometEventImpl.java?view=diff&rev=543226&r1=543225&r2=543226
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/connector/CometEventImpl.java 
(original)
+++ tomcat/trunk/java/org/apache/catalina/connector/CometEventImpl.java Thu May 
31 12:32:33 2007
@@ -29,6 +29,7 @@
 import org.apache.coyote.ActionCode;
 import org.apache.tomcat.util.net.PollerInterest;
 import java.util.Arrays;
+import org.apache.tomcat.util.MutableBoolean;
 
 public class CometEventImpl implements CometEvent {
 
@@ -142,12 +143,16 @@
     }
     
     public boolean isReadable() {
-        return request.isReadable();
+        return request.isAvailable() || request.isReadable();
     }    
     public boolean isWriteable() {
         return response.isWriteable();
     }
     
+    public boolean hasOp(CometEvent.CometOperation op) {
+        return cometOperations.contains(op);
+    }
+    
     public void configure(CometEvent.CometConfiguration... options)
         throws IOException, IllegalStateException {
         checkWorkerThread();
@@ -169,7 +174,7 @@
         throws IOException, IllegalStateException {
         //remove from the registered set
         cometOperations.removeAll(Arrays.asList(operations));
-        request.action(ActionCode.ACTION_COMET_UNREGISTER, 
translate(cometOperations.toArray(new CometOperation[0])));
+        request.action(ActionCode.ACTION_COMET_REGISTER, 
translate(cometOperations.toArray(new CometOperation[0])));
     }
     
     public CometConfiguration[] getConfiguration() {

Modified: tomcat/trunk/java/org/apache/catalina/connector/CoyoteAdapter.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/connector/CoyoteAdapter.java?view=diff&rev=543226&r1=543225&r2=543226
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/connector/CoyoteAdapter.java 
(original)
+++ tomcat/trunk/java/org/apache/catalina/connector/CoyoteAdapter.java Thu May 
31 12:32:33 2007
@@ -227,7 +227,7 @@
                 }
                 if (response.isClosed() || !request.isComet()) {
                     res.action(ActionCode.ACTION_COMET_END, null);
-                } else if (!error && read && request.isReadable()) {
+                } else if (!error && read && request.isAvailable()) {
                     // If this was a read and not all bytes have been read, or 
if no data
                     // was read from the connector, then it is an error
                     error = true;
@@ -312,7 +312,7 @@
 
                 if (request.isComet()) {
                     if (!response.isClosed() && !response.isError()) {
-                        if (request.isReadable()) {
+                        if (request.isAvailable() && 
request.hasOp(CometEvent.CometOperation.OP_READ)) {
                             // Invoke a read event right away if there are 
available bytes
                             if (event(req, res, SocketStatus.OPEN_READ)) {
                                 comet = true;

Modified: tomcat/trunk/java/org/apache/catalina/connector/OutputBuffer.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/connector/OutputBuffer.java?view=diff&rev=543226&r1=543225&r2=543226
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/connector/OutputBuffer.java (original)
+++ tomcat/trunk/java/org/apache/catalina/connector/OutputBuffer.java Thu May 
31 12:32:33 2007
@@ -324,13 +324,6 @@
     }
 
     
-    /**
-     * Return the amount of bytes written by the lower layer.
-     */
-    protected int lastWrite() {
-        return coyoteResponse.getLastWrite();
-    }
-    
 
     // ------------------------------------------------- Bytes Handling Methods
 

Modified: tomcat/trunk/java/org/apache/catalina/connector/Request.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/connector/Request.java?view=diff&rev=543226&r1=543225&r2=543226
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/connector/Request.java (original)
+++ tomcat/trunk/java/org/apache/catalina/connector/Request.java Thu May 31 
12:32:33 2007
@@ -70,6 +70,8 @@
 import org.apache.catalina.util.RequestUtil;
 import org.apache.catalina.util.StringManager;
 import org.apache.catalina.util.StringParser;
+import org.apache.tomcat.util.MutableBoolean;
+import org.apache.catalina.CometEvent;
 
 
 /**
@@ -2250,13 +2252,26 @@
 
     
     /**
-     * Return true if bytes are available.
+     * Return true if bytes are available at the servlet layer
      */
-    public boolean isReadable() {
+    public boolean isAvailable() {
         return (inputBuffer.available() > 0);
     }
-
     
+    /**
+     * returns true if we read data from the socket
+     * @return boolean
+     */
+    public boolean isReadable() {
+        MutableBoolean bool = new MutableBoolean(false);
+        action(ActionCode.ACTION_COMET_READABLE,bool);
+        return bool.get();    
+    }
+
+    public boolean hasOp(CometEvent.CometOperation op) {
+        if ( !comet || getEvent()==null ) return false;
+        return event.hasOp(op);
+    }
     // ------------------------------------------------------ Protected Methods
 
     protected void action(ActionCode actionCode, Object param) {

Modified: tomcat/trunk/java/org/apache/catalina/connector/Response.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/connector/Response.java?view=diff&rev=543226&r1=543225&r2=543226
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/connector/Response.java (original)
+++ tomcat/trunk/java/org/apache/catalina/connector/Response.java Thu May 31 
12:32:33 2007
@@ -52,6 +52,8 @@
 import org.apache.tomcat.util.http.MimeHeaders;
 import org.apache.tomcat.util.http.ServerCookie;
 import org.apache.tomcat.util.net.URL;
+import org.apache.coyote.ActionCode;
+import org.apache.tomcat.util.MutableBoolean;
 
 /**
  * Wrapper object for the Coyote response.
@@ -533,7 +535,9 @@
      * Return true if bytes are available.
      */
     public boolean isWriteable() {
-        return (outputBuffer.lastWrite() > 0);
+        MutableBoolean bool = new MutableBoolean(false);
+        coyoteResponse.action(ActionCode.ACTION_COMET_WRITEABLE,bool);
+        return bool.get();    
     }
 
     

Modified: tomcat/trunk/java/org/apache/coyote/ActionCode.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ActionCode.java?view=diff&rev=543226&r1=543225&r2=543226
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/ActionCode.java (original)
+++ tomcat/trunk/java/org/apache/coyote/ActionCode.java Thu May 31 12:32:33 2007
@@ -167,11 +167,14 @@
     public static final ActionCode ACTION_COMET_REGISTER = new ActionCode(26);
     
     /**
-     * Unregister for notifications for a comet connection
+     * Action for getting the readable status
      */
-    public static final ActionCode ACTION_COMET_UNREGISTER = new 
ActionCode(27);
-    
+    public static final ActionCode ACTION_COMET_READABLE = new ActionCode(28);
 
+    /**
+     * Action for getting the writeable status
+     */
+    public static final ActionCode ACTION_COMET_WRITEABLE = new ActionCode(29);
 
     // ----------------------------------------------------------- Constructors
     int code;

Modified: tomcat/trunk/java/org/apache/coyote/Response.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/Response.java?view=diff&rev=543226&r1=543225&r2=543226
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/Response.java (original)
+++ tomcat/trunk/java/org/apache/coyote/Response.java Thu May 31 12:32:33 2007
@@ -123,8 +123,6 @@
     protected String errorURI = null;
 
     protected Request req;
-
-    protected int lastWrite = 1;
     
     // ------------------------------------------------------------- Properties
 
@@ -190,16 +188,6 @@
     // -------------------- State --------------------
 
 
-    public int getLastWrite() {
-        return lastWrite;
-    }
-
-    
-    public void setLastWrite(int lastWrite) {
-        this.lastWrite = lastWrite;
-    }
-
-
     public int getStatus() {
         return status;
     }
@@ -591,7 +579,6 @@
         headers.clear();
 
         // update counters
-        lastWrite = 1;
         bytesWritten=0;
     }
 

Modified: tomcat/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java?view=diff&rev=543226&r1=543225&r2=543226
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java 
(original)
+++ tomcat/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java Thu May 
31 12:32:33 2007
@@ -54,6 +54,7 @@
 import org.apache.tomcat.util.res.StringManager;
 import org.apache.tomcat.util.net.NioEndpoint.KeyAttachment;
 import org.apache.tomcat.util.net.PollerInterest;
+import org.apache.tomcat.util.MutableBoolean;
 
 
 /**
@@ -91,12 +92,12 @@
 
         request = new Request();
         int readTimeout = endpoint.getSoTimeout();
-        inputBuffer = new InternalNioInputBuffer(request, 
maxHttpHeaderSize,readTimeout);
+        inputBuffer = new InternalNioInputBuffer(request, maxHttpHeaderSize);
         request.setInputBuffer(inputBuffer);
 
         response = new Response();
         response.setHook(this);
-        outputBuffer = new InternalNioOutputBuffer(response, 
maxHttpHeaderSize,readTimeout);
+        outputBuffer = new InternalNioOutputBuffer(response, 
maxHttpHeaderSize);
         response.setOutputBuffer(outputBuffer);
         request.setResponse(response);
 
@@ -819,7 +820,6 @@
             try {
                 if( !disableUploadTimeout && keptAlive && soTimeout > 0 ) {
                     
socket.getIOChannel().socket().setSoTimeout((int)soTimeout);
-                    inputBuffer.readTimeout = soTimeout;
                 }
                 if (!inputBuffer.parseRequestLine(keptAlive)) {
                     //no data available yet, since we might have read part
@@ -839,7 +839,6 @@
                 request.setStartTime(System.currentTimeMillis());
                 if (!disableUploadTimeout) { //only for body, not for request 
headers
                     socket.getIOChannel().socket().setSoTimeout((int)timeout);
-                    inputBuffer.readTimeout = soTimeout;
                 }
             } catch (IOException e) {
                 error = true;
@@ -1223,20 +1222,22 @@
         } else if (actionCode == ActionCode.ACTION_COMET_REGISTER) {
             int interest = getPollerInterest(param);
             NioEndpoint.KeyAttachment attach = 
(NioEndpoint.KeyAttachment)socket.getAttachment(false);
-            attach.setCometOps(attach.getCometOps()|interest);
-            //notify poller if not on a tomcat thread
-            RequestInfo rp = request.getRequestProcessor();
-            if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE )
-                socket.getPoller().cometInterest(socket);
-        } else if (actionCode == ActionCode.ACTION_COMET_UNREGISTER) {
-            int interest = getPollerInterest(param);
-            NioEndpoint.KeyAttachment attach = 
(NioEndpoint.KeyAttachment)socket.getAttachment(false);
-            attach.setCometOps(attach.getCometOps()& (~interest));
+            attach.setCometOps(interest);
             //notify poller if not on a tomcat thread
             RequestInfo rp = request.getRequestProcessor();
             if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE )
                 socket.getPoller().cometInterest(socket);
         } else if (actionCode == ActionCode.ACTION_COMET_CONFIGURE) {
+        } else if (actionCode == ActionCode.ACTION_COMET_READABLE) {
+            MutableBoolean bool = (MutableBoolean)param;
+            try {
+                bool.set(inputBuffer.isReadable());
+            }catch ( IOException x ) {
+                throw new RuntimeException(x);
+            }
+        } else if (actionCode == ActionCode.ACTION_COMET_WRITEABLE) {
+            MutableBoolean bool = (MutableBoolean)param;
+            bool.set(outputBuffer.isWritable());
         }
 
     }

Modified: tomcat/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java?view=diff&rev=543226&r1=543225&r2=543226
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java 
(original)
+++ tomcat/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java Thu 
May 31 12:32:33 2007
@@ -30,6 +30,7 @@
 import org.apache.tomcat.util.net.NioChannel;
 import org.apache.tomcat.util.net.NioSelectorPool;
 import org.apache.tomcat.util.res.StringManager;
+import org.apache.tomcat.util.net.NioEndpoint;
 
 /**
  * Implementation of InputBuffer which provides HTTP request header parsing as
@@ -51,8 +52,7 @@
     /**
      * Alternate constructor.
      */
-    public InternalNioInputBuffer(Request request, int headerBufferSize, 
-                                  long readTimeout) {
+    public InternalNioInputBuffer(Request request, int headerBufferSize) {
 
         this.request = request;
         headers = request.getMimeHeaders();
@@ -80,12 +80,6 @@
         headerData.recycle();
         swallowInput = true;
 
-        if (readTimeout < 0) {
-            this.readTimeout = -1;
-        } else {
-            this.readTimeout = readTimeout;
-        }
-
     }
 
 
@@ -195,12 +189,6 @@
     protected int lastActiveFilter;
 
 
-    /**
-     * The socket timeout used when reading the first block of the request
-     * header.
-     */
-    protected long readTimeout;
-
     // ------------------------------------------------------------- Properties
 
 
@@ -296,7 +284,23 @@
     }
 
     // --------------------------------------------------------- Public Methods
-
+    /**
+     * Returns true if there are bytes available from the socket layer
+     * @return boolean
+     * @throws IOException
+     */
+    public boolean isReadable() throws IOException {
+        return (pos < lastValid) || (nbRead()>0);
+    }
+    
+    /**
+     * Issues a non blocking read
+     * @return int
+     * @throws IOException
+     */
+    public int nbRead() throws IOException {
+        return readSocket(true,false);
+    }
 
     /**
      * Recycle the input buffer. This should be called when closing the 
@@ -413,13 +417,8 @@
                     if (useAvailableData) {
                         return false;
                     }
-                    if (readTimeout == -1) {
-                        if (!fill(false,true)) //request line parsing
-                            throw new 
EOFException(sm.getString("iib.eof.error"));
-                    } else {
-                        // Do a simple read with a short timeout
-                        if ( !readSocket(true, false) ) return false;
-                    }
+                    // Do a simple read with a short timeout
+                    if ( readSocket(true, false)==0 ) return false;
                 }
                 chr = buf[pos++];
             } while ((chr == Constants.CR) || (chr == Constants.LF));
@@ -434,13 +433,8 @@
                 if (useAvailableData) {
                     return false;
                 }
-                if (readTimeout == -1) {
-                    if (!fill(false,false)) //request line parsing
-                        return false;
-                } else {
-                    // Do a simple read with a short timeout
-                    if ( !readSocket(true, false) ) return false;
-                }
+                // Do a simple read with a short timeout
+                if ( readSocket(true, false)==0 ) return false;
             }
             parsingRequestLinePhase = 2;
         }
@@ -552,6 +546,7 @@
             tmp = null;
         }
     }
+    
     /**
      * Perform blocking read with a timeout if desired
      * @param timeout boolean - if we want to use the timeout data
@@ -560,15 +555,16 @@
      * @throws IOException if a socket exception occurs
      * @throws EOFException if end of stream is reached
      */
-    private boolean readSocket(boolean timeout, boolean block) throws 
IOException {
+    private int readSocket(boolean timeout, boolean block) throws IOException {
         int nRead = 0;
-        long rto = timeout?this.readTimeout:-1;
         socket.getBufHandler().getReadBuffer().clear();
         if ( block ) {
             Selector selector = null;
             try { selector = getSelectorPool().get(); }catch ( IOException x ) 
{}
             try {
-                nRead = 
getSelectorPool().read(socket.getBufHandler().getReadBuffer(),socket,selector,rto);
+                NioEndpoint.KeyAttachment att = 
(NioEndpoint.KeyAttachment)socket.getAttachment(false);
+                if ( att == null ) throw new IOException("Key must be 
cancelled.");
+                nRead = 
getSelectorPool().read(socket.getBufHandler().getReadBuffer(),socket,selector,att.getTimeout());
             } catch ( EOFException eof ) {
                 nRead = -1;
             } finally { 
@@ -583,12 +579,12 @@
             expand(nRead + pos);
             socket.getBufHandler().getReadBuffer().get(buf, pos, nRead);
             lastValid = pos + nRead;
-            return true;
+            return nRead;
         } else if (nRead == -1) {
             //return false;
             throw new EOFException(sm.getString("iib.eof.error"));
         } else {
-            return false;
+            return 0;
         }
     }
 
@@ -852,7 +848,7 @@
             }
 
             // Do a simple read with a short timeout
-            read = readSocket(timeout,block);
+            read = readSocket(timeout,block)>0;
         } else {
 
             if (buf.length - end < 4500) {
@@ -865,7 +861,7 @@
             pos = end;
             lastValid = pos;
             // Do a simple read with a short timeout
-            read = readSocket(timeout, block);
+            read = readSocket(timeout, block)>0;
         }
         return read;
     }

Modified: 
tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java?view=diff&rev=543226&r1=543225&r2=543226
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java 
(original)
+++ tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java Thu 
May 31 12:32:33 2007
@@ -34,6 +34,8 @@
 import org.apache.tomcat.util.net.NioEndpoint;
 import org.apache.tomcat.util.net.NioSelectorPool;
 import org.apache.tomcat.util.res.StringManager;
+import java.io.EOFException;
+import org.apache.tomcat.util.MutableInteger;
 
 /**
  * Output buffer.
@@ -56,14 +58,14 @@
      * Default constructor.
      */
     public InternalNioOutputBuffer(Response response) {
-        this(response, Constants.DEFAULT_HTTP_HEADER_BUFFER_SIZE, 10000);
+        this(response, Constants.DEFAULT_HTTP_HEADER_BUFFER_SIZE);
     }
 
 
     /**
      * Alternate constructor.
      */
-    public InternalNioOutputBuffer(Response response, int headerBufferSize, 
long writeTimeout) {
+    public InternalNioOutputBuffer(Response response, int headerBufferSize) {
 
         this.response = response;
         headers = response.getMimeHeaders();
@@ -86,8 +88,6 @@
         committed = false;
         finished = false;
         
-        this.writeTimeout = writeTimeout;
-
         // Cause loading of HttpMessages
         HttpMessages.getMessage(200);
 
@@ -142,6 +142,10 @@
      */
     protected int pos;
 
+    /**
+     * Number of bytes last written
+     */
+    protected MutableInteger lastWrite = new MutableInteger(1);
 
     /**
      * Underlying socket.
@@ -179,12 +183,6 @@
      */
     protected int lastActiveFilter;
     
-    /**
-     * Write time out in milliseconds
-     */
-    protected long writeTimeout = -1;
-
-
     // ------------------------------------------------------------- Properties
 
 
@@ -195,10 +193,6 @@
         this.socket = socket;
     }
 
-    public void setWriteTimeout(long writeTimeout) {
-        this.writeTimeout = writeTimeout;
-    }
-
     /**
      * Get the underlying socket input stream.
      */
@@ -206,10 +200,6 @@
         return socket;
     }
 
-    public long getWriteTimeout() {
-        return writeTimeout;
-    }
-
     public void setSelectorPool(NioSelectorPool pool) { 
         this.pool = pool;
     }
@@ -324,7 +314,6 @@
 
         // Recycle Request object
         response.recycle();
-
     }
 
 
@@ -343,6 +332,7 @@
         lastActiveFilter = -1;
         committed = false;
         finished = false;
+        lastWrite.set(1);
 
     }
 
@@ -401,7 +391,9 @@
 
     }
 
-
+    public boolean isWritable() {
+        return lastWrite.get()>0;
+    }
     // ------------------------------------------------ HTTP/1.1 Output Methods
 
 
@@ -414,15 +406,25 @@
         if (!committed) {
             //Socket.send(socket, Constants.ACK_BYTES, 0, 
Constants.ACK_BYTES.length) < 0
             ByteBuffer buf = 
ByteBuffer.wrap(Constants.ACK_BYTES,0,Constants.ACK_BYTES.length);    
-            writeToSocket(buf,false);
+            writeToSocket(buf,false,true);
         }
 
     }
 
-    private synchronized int writeToSocket(ByteBuffer bytebuffer, boolean 
flip) throws IOException {
+    /**
+     * 
+     * @param bytebuffer ByteBuffer
+     * @param flip boolean
+     * @return int
+     * @throws IOException
+     */
+    private synchronized int writeToSocket(ByteBuffer bytebuffer, boolean 
flip, boolean block) throws IOException {
         //int limit = bytebuffer.position();
         if ( flip ) bytebuffer.flip();
         int written = 0;
+        NioEndpoint.KeyAttachment att = 
(NioEndpoint.KeyAttachment)socket.getAttachment(false);
+        if ( att == null ) throw new IOException("Key must be cancelled");
+        long writeTimeout = att.getTimeout();
         Selector selector = null;
         try {
             selector = getSelectorPool().get();
@@ -430,10 +432,10 @@
             //ignore
         }
         try {
-            written = getSelectorPool().write(bytebuffer, socket, selector, 
writeTimeout);
+            written = getSelectorPool().write(bytebuffer, socket, selector, 
writeTimeout, block,lastWrite);
             //make sure we are flushed 
             do {
-                if (socket.flush(true,selector,writeTimeout)) break;
+                if (socket.flush(true,selector,writeTimeout,lastWrite)) break;
             }while ( true );
         }finally { 
             if ( selector != null ) getSelectorPool().put(selector);
@@ -759,7 +761,7 @@
 
         //write to the socket, if there is anything to write
         if (socket.getBufHandler().getWriteBuffer().position() > 0) {
-            writeToSocket(socket.getBufHandler().getWriteBuffer(),true);
+            writeToSocket(socket.getBufHandler().getWriteBuffer(),true,true);
         }
     }
 

Added: tomcat/trunk/java/org/apache/tomcat/util/MutableBoolean.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/MutableBoolean.java?view=auto&rev=543226
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/MutableBoolean.java (added)
+++ tomcat/trunk/java/org/apache/tomcat/util/MutableBoolean.java Thu May 31 
12:32:33 2007
@@ -0,0 +1,29 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.tomcat.util;
+
+public class MutableBoolean {
+    protected boolean value = false;
+    public MutableBoolean() {}
+    public MutableBoolean(boolean val) {
+        this.value = val;
+    }
+    
+    public boolean get() { return value;}
+    public void set(boolean val) {this.value = val;}
+}

Added: tomcat/trunk/java/org/apache/tomcat/util/MutableInteger.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/MutableInteger.java?view=auto&rev=543226
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/MutableInteger.java (added)
+++ tomcat/trunk/java/org/apache/tomcat/util/MutableInteger.java Thu May 31 
12:32:33 2007
@@ -0,0 +1,29 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.tomcat.util;
+
+public class MutableInteger {
+    protected int value = 0;
+    public MutableInteger() {}
+    public MutableInteger(int val) {
+        this.value = val;
+    }
+
+    public int get() { return value;}
+    public void set(int val) {this.value = val;}
+}

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java?view=diff&rev=543226&r1=543225&r2=543226
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java 
(original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java Thu 
May 31 12:32:33 2007
@@ -24,6 +24,7 @@
 import java.util.concurrent.TimeUnit;
 
 import org.apache.tomcat.util.net.NioEndpoint.KeyAttachment;
+import org.apache.tomcat.util.MutableInteger;
 
 public class NioBlockingSelector {
     public NioBlockingSelector() {
@@ -41,7 +42,7 @@
      * @throws SocketTimeoutException if the write times out
      * @throws IOException if an IO Exception occurs in the underlying socket 
logic
      */
-    public static int write(ByteBuffer buf, NioChannel socket, long 
writeTimeout) throws IOException {
+    public static int write(ByteBuffer buf, NioChannel socket, long 
writeTimeout,MutableInteger lastWrite) throws IOException {
         SelectionKey key = 
socket.getIOChannel().keyFor(socket.getPoller().getSelector());
         int written = 0;
         boolean timedout = false;
@@ -55,6 +56,7 @@
             while ( (!timedout) && buf.hasRemaining()) {
                 if (keycount > 0) { //only write if we were registered for a 
write
                     int cnt = socket.write(buf); //write the data
+                    lastWrite.set(cnt);
                     if (cnt == -1)
                         throw new EOFException();
                     written += cnt;

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/NioChannel.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/NioChannel.java?view=diff&rev=543226&r1=543225&r2=543226
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/NioChannel.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/NioChannel.java Thu May 31 
12:32:33 2007
@@ -27,6 +27,7 @@
 import org.apache.tomcat.util.net.SecureNioChannel.ApplicationBufferHandler;
 import java.nio.channels.Selector;
 import java.nio.channels.SelectionKey;
+import org.apache.tomcat.util.MutableInteger;
 
 /**
  * 
@@ -70,7 +71,8 @@
      * been flushed out and is empty
      * @return boolean
      */
-    public boolean flush(boolean block, Selector s,long timeout) throws 
IOException {
+    public boolean flush(boolean block, Selector s, long 
timeout,MutableInteger lastWrite) throws IOException {
+        if (lastWrite!=null) lastWrite.set(1);
         return true; //no network buffer in the regular channel
     }
 

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?view=diff&rev=543226&r1=543225&r2=543226
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Thu May 31 
12:32:33 2007
@@ -479,24 +479,13 @@
     /**
      * The socket poller.
      */
-    protected Poller[] pollers = null;
-    protected int pollerRoundRobin = 0;
+    protected Poller poller = null;
     public Poller getPoller0() {
-        pollerRoundRobin = (pollerRoundRobin + 1) % pollers.length;
-        Poller poller = pollers[pollerRoundRobin];
         return poller;
     }
 
-
-    /**
-     * The socket poller used for Comet support.
-     */
-    public Poller getCometPoller0() {
-        Poller poller = getPoller0();
-        return poller;
-    }
-
-
+    protected Poller readWritePoller = null;
+    
     /**
      * Dummy maxSpareThreads property.
      */
@@ -649,14 +638,10 @@
      * Number of keepalive sockets.
      */
     public int getKeepAliveCount() {
-        if (pollers == null) {
+        if (poller == null) {
             return 0;
         } else {
-            int keepAliveCount = 0;
-            for (int i = 0; i < pollers.length; i++) {
-                keepAliveCount += pollers[i].getKeepAliveCount();
-            }
-            return keepAliveCount;
+                return poller.selector.keys().size();
         }
     }
 
@@ -793,16 +778,12 @@
                 acceptorThread.start();
             }
 
-            // Start poller threads
-            pollers = new Poller[pollerThreadCount];
-            for (int i = 0; i < pollerThreadCount; i++) {
-                pollers[i] = new Poller();
-                pollers[i].init();
-                Thread pollerThread = new Thread(pollers[i], getName() + 
"-Poller-" + i);
-                pollerThread.setPriority(threadPriority);
-                pollerThread.setDaemon(true);
-                pollerThread.start();
-            }
+            // Start poller thread
+            poller = new Poller();
+            Thread pollerThread = new Thread(poller, getName() + 
"-ClientPoller");
+            pollerThread.setPriority(threadPriority);
+            pollerThread.setDaemon(true);
+            pollerThread.start();
         }
     }
 
@@ -836,10 +817,8 @@
         if (running) {
             running = false;
             unlockAccept();
-            for (int i = 0; i < pollers.length; i++) {
-                pollers[i].destroy();
-            }
-            pollers = null;
+                poller.destroy();
+            poller = null;
         }
         eventCache.clear();
         keyCache.clear();
@@ -1118,6 +1097,8 @@
     
     protected boolean processSocket(NioChannel socket, SocketStatus status, 
boolean dispatch) {
         try {
+            KeyAttachment attachment = 
(KeyAttachment)socket.getAttachment(false);
+            attachment.setCometNotify(false); //will get reset upon next reg
             if (executor == null) {
                 getWorkerThread().assign(socket, status);
             } else {
@@ -1248,8 +1229,9 @@
                             interestOps = (interestOps & 
(~OP_CALLBACK));//remove the callback flag
                             att.access();//to prevent timeout
                             //we are registering the key to start with, reset 
the fairness counter.
-                            att.interestOps(interestOps);
-                            key.interestOps(interestOps);
+                            int ops = key.interestOps() | interestOps;
+                            att.interestOps(ops);
+                            key.interestOps(ops);
                         } else {
                             cancel = true;
                         }
@@ -1269,6 +1251,7 @@
             return super.toString()+"[intOps="+this.interestOps+"]";
         }
     }
+    
     /**
      * Poller class.
      */
@@ -1279,9 +1262,6 @@
         
         protected boolean close = false;
         protected long nextExpiration = 0;//optimize expiration handling
-
-        protected int keepAliveCount = 0;
-        public int getKeepAliveCount() { return keepAliveCount; }
         
         protected AtomicLong wakeupCounter = new AtomicLong(0l);
         
@@ -1296,14 +1276,6 @@
         public Selector getSelector() { return selector;}
 
         /**
-         * Create the poller. With some versions of APR, the maximum poller 
size will
-         * be 62 (reocmpiling APR is necessary to remove this limitation).
-         */
-        protected void init() {
-            keepAliveCount = 0;
-        }
-
-        /**
          * Destroy the poller.
          */
         protected void destroy() {
@@ -1379,7 +1351,7 @@
             socket.setPoller(this);
             KeyAttachment key = keyCache.poll();
             final KeyAttachment ka = key!=null?key:new KeyAttachment();
-            ka.reset(this,socket);
+            ka.reset(this,socket,getSocketProperties().getSoTimeout());
             PollerEvent r = eventCache.poll();
             ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER 
turns into.
             if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
@@ -1621,7 +1593,6 @@
                     } else if ( ka.getError() ) {
                         cancelledKey(key, SocketStatus.ERROR,true);
                     } else if (ka.getComet() && ka.getCometNotify() ) {
-                        ka.setCometNotify(false);//this will get reset after 
invokation if callback is still in there
                         reg(key,ka,0);//avoid multiple calls, this gets 
reregistered after invokation
                         if (!processSocket(ka.getChannel(), 
SocketStatus.OPEN_CALLBACK)) processSocket(ka.getChannel(), 
SocketStatus.DISCONNECT);
                     }else if ((ka.interestOps()&SelectionKey.OP_READ) == 
SelectionKey.OP_READ) {
@@ -1656,13 +1627,13 @@
         public KeyAttachment() {
             
         }
-        public void reset(Poller poller, NioChannel channel) {
+        public void reset(Poller poller, NioChannel channel, long soTimeout) {
             this.channel = channel;
             this.poller = poller;
             lastAccess = System.currentTimeMillis();
             currentAccess = false;
             comet = false;
-            timeout = -1;
+            timeout = soTimeout;
             error = false;
             lastRegistered = 0;
             sendfileData = null;
@@ -1676,7 +1647,7 @@
         }
         
         public void reset() {
-            reset(null,null);
+            reset(null,null,-1);
         }
         
         public Poller getPoller() { return poller;}

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/NioSelectorPool.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/NioSelectorPool.java?view=diff&rev=543226&r1=543225&r2=543226
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/NioSelectorPool.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/NioSelectorPool.java Thu May 
31 12:32:33 2007
@@ -16,17 +16,20 @@
  */
 package org.apache.tomcat.util.net;
 
-import java.util.concurrent.atomic.AtomicInteger;
-import java.nio.channels.Selector;
+import java.io.EOFException;
 import java.io.IOException;
-import java.util.NoSuchElementException;
+import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
-import java.io.EOFException;
-import java.net.SocketTimeoutException;
+import java.nio.channels.Selector;
+import java.util.NoSuchElementException;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.juli.logging.Log;
 import org.apache.juli.logging.LogFactory;
+import org.apache.tomcat.util.MutableInteger;
+import java.util.Iterator;
 
 /**
  *
@@ -37,20 +40,23 @@
  */
 
 public class NioSelectorPool {
+    protected static int threadCount = 0;
+    
     protected static Log log = LogFactory.getLog(NioSelectorPool.class);
 
     protected final static boolean SHARED =
         
Boolean.valueOf(System.getProperty("org.apache.tomcat.util.net.NioSelectorShared",
 "true")).booleanValue();
-    protected static Selector SHARED_SELECTOR;
+    protected Selector SHARED_SELECTOR;
     
     protected int maxSelectors = 200;
+    protected long sharedSelectorTimeout = 30000;
     protected int maxSpareSelectors = -1;
     protected boolean enabled = true;
     protected AtomicInteger active = new AtomicInteger(0);
     protected AtomicInteger spare = new AtomicInteger(0);
     protected ConcurrentLinkedQueue<Selector> selectors = new 
ConcurrentLinkedQueue<Selector>();
 
-    protected static Selector getSharedSelector() throws IOException {
+    protected Selector getSharedSelector() throws IOException {
         if (SHARED && SHARED_SELECTOR == null) {
             synchronized ( NioSelectorPool.class ) {
                 if ( SHARED_SELECTOR == null )  {
@@ -127,12 +133,13 @@
      * @throws IOException if an IO Exception occurs in the underlying socket 
logic
      */
     public int write(ByteBuffer buf, NioChannel socket, Selector selector, 
long writeTimeout) throws IOException {
-        return write(buf,socket,selector,writeTimeout,true);
+        return write(buf,socket,selector,writeTimeout,true,null);
     }
     
-    public int write(ByteBuffer buf, NioChannel socket, Selector selector, 
long writeTimeout, boolean block) throws IOException {
-        if ( SHARED && block) {
-            return NioBlockingSelector.write(buf,socket,writeTimeout);
+    public int write(ByteBuffer buf, NioChannel socket, Selector selector, 
+                     long writeTimeout, boolean block,MutableInteger 
lastWrite) throws IOException {
+        if ( SHARED && block ) {
+            return 
NioBlockingSelector.write(buf,socket,writeTimeout,lastWrite);
         }
         SelectionKey key = null;
         int written = 0;
@@ -148,7 +155,9 @@
                 int cnt = 0;
                 if ( keycount > 0 ) { //only write if we were registered for a 
write
                     cnt = socket.write(buf); //write the data
+                    if (lastWrite!=null) lastWrite.set(cnt);
                     if (cnt == -1) throw new EOFException();
+                    
                     written += cnt;
                     if (cnt > 0) {
                         time = System.currentTimeMillis(); //reset our timeout 
timer
@@ -206,7 +215,7 @@
      * @throws IOException if an IO Exception occurs in the underlying socket 
logic
      */
     public int read(ByteBuffer buf, NioChannel socket, Selector selector, long 
readTimeout, boolean block) throws IOException {
-        if ( SHARED && block) {
+        if ( SHARED && block ) {
             return NioBlockingSelector.read(buf,socket,readTimeout);
         }
         SelectionKey key = null;
@@ -254,6 +263,10 @@
         this.enabled = enabled;
     }
 
+    public void setSharedSelectorTimeout(long sharedSelectorTimeout) {
+        this.sharedSelectorTimeout = sharedSelectorTimeout;
+    }
+
     public int getMaxSelectors() {
         return maxSelectors;
     }
@@ -264,5 +277,17 @@
 
     public boolean isEnabled() {
         return enabled;
+    }
+
+    public long getSharedSelectorTimeout() {
+        return sharedSelectorTimeout;
+    }
+
+    public ConcurrentLinkedQueue getSelectors() {
+        return selectors;
+    }
+
+    public AtomicInteger getSpare() {
+        return spare;
     }
 }

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java?view=diff&rev=543226&r1=543225&r2=543226
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java 
(original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java Thu May 
31 12:32:33 2007
@@ -25,6 +25,7 @@
 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
 import javax.net.ssl.SSLEngineResult.Status;
 import java.nio.channels.Selector;
+import org.apache.tomcat.util.MutableInteger;
 
 /**
  * 
@@ -102,11 +103,11 @@
      * been flushed out and is empty
      * @return boolean
      */
-    public boolean flush(boolean block, Selector s, long timeout) throws 
IOException {
+    public boolean flush(boolean block, Selector s, long 
timeout,MutableInteger lastWrite) throws IOException {
         if (!block) {
             flush(netOutBuffer);
         } else {
-            pool.write(netOutBuffer, this, s, timeout);
+            pool.write(netOutBuffer, this, s, timeout,block,lastWrite);
         }
         return !netOutBuffer.hasRemaining();
     }



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to