Author: remm
Date: Wed May 17 05:55:39 2006
New Revision: 407241

URL: http://svn.apache.org/viewcvs?rev=407241&view=rev
Log:
- Start work on comet support. Note: it doesn't work yet, I think (I didn't 
test), and most of this
  is very preliminary. It is relatively straightforward, though.

Added:
    tomcat/tc6.0.x/trunk/java/org/apache/catalina/CometProcessor.java   (with 
props)
    tomcat/tc6.0.x/trunk/java/org/apache/catalina/servlets/CometServlet.java   
(with props)
Modified:
    tomcat/tc6.0.x/trunk/java/org/apache/catalina/connector/CoyoteAdapter.java
    tomcat/tc6.0.x/trunk/java/org/apache/catalina/connector/Request.java
    tomcat/tc6.0.x/trunk/java/org/apache/coyote/ajp/AjpAprProtocol.java
    tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11AprProcessor.java
    tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11AprProtocol.java
    
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalAprInputBuffer.java
    tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java

Added: tomcat/tc6.0.x/trunk/java/org/apache/catalina/CometProcessor.java
URL: 
http://svn.apache.org/viewcvs/tomcat/tc6.0.x/trunk/java/org/apache/catalina/CometProcessor.java?rev=407241&view=auto
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/catalina/CometProcessor.java (added)
+++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/CometProcessor.java Wed May 
17 05:55:39 2006
@@ -0,0 +1,21 @@
+package org.apache.catalina;
+
+import java.io.IOException;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+public interface CometProcessor {
+
+    public void begin(HttpServletRequest request, HttpServletResponse response)
+        throws IOException, ServletException;
+    public void end(HttpServletRequest request, HttpServletResponse response)
+        throws IOException, ServletException;
+
+    public void error(HttpServletRequest request, HttpServletResponse response)
+        throws IOException, ServletException;
+    public void read(HttpServletRequest request, HttpServletResponse response)
+        throws IOException, ServletException;
+
+}

Propchange: tomcat/tc6.0.x/trunk/java/org/apache/catalina/CometProcessor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
tomcat/tc6.0.x/trunk/java/org/apache/catalina/connector/CoyoteAdapter.java
URL: 
http://svn.apache.org/viewcvs/tomcat/tc6.0.x/trunk/java/org/apache/catalina/connector/CoyoteAdapter.java?rev=407241&r1=407240&r2=407241&view=diff
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/catalina/connector/CoyoteAdapter.java 
(original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/connector/CoyoteAdapter.java 
Wed May 17 05:55:39 2006
@@ -19,6 +19,7 @@
 
 import java.io.IOException;
 
+import org.apache.catalina.CometProcessor;
 import org.apache.catalina.Context;
 import org.apache.catalina.Globals;
 import org.apache.catalina.Wrapper;
@@ -135,10 +136,35 @@
 
         }
 
+        // Comet processing
+        if (request.getWrapper() != null 
+                && request.getWrapper() instanceof CometProcessor) {
+            try {
+                if (request.getAttribute("org.apache.tomcat.comet.error") != 
null) {
+                    ((CometProcessor) 
request.getWrapper()).error(request.getRequest(), response.getResponse());
+                } else {
+                    ((CometProcessor) 
request.getWrapper()).read(request.getRequest(), response.getResponse());
+                }
+            } catch (IOException e) {
+                ;
+            } catch (Throwable t) {
+                log.error(sm.getString("coyoteAdapter.service"), t);
+            } finally {
+                // Recycle the wrapper request and response
+                if (request.getAttribute("org.apache.tomcat.comet") == null) {
+                    request.recycle();
+                    response.recycle();
+                }
+            }
+            return;
+        }
+        
         if (connector.getXpoweredBy()) {
             response.addHeader("X-Powered-By", "Servlet/2.5");
         }
 
+        boolean comet = false;
+        
         try {
 
             // Parse and set Catalina and configuration specific 
@@ -148,8 +174,16 @@
                 
connector.getContainer().getPipeline().getFirst().invoke(request, response);
             }
 
-            response.finishResponse();
-            req.action( ActionCode.ACTION_POST_REQUEST , null);
+            if (request.getAttribute("org.apache.tomcat.comet.support") == 
Boolean.TRUE 
+                    && request.getWrapper() instanceof CometProcessor) {
+                request.setAttribute("org.apache.tomcat.comet", Boolean.TRUE);
+                comet = true;
+            }
+
+            if (!comet) {
+                response.finishResponse();
+                req.action( ActionCode.ACTION_POST_REQUEST , null);
+            }
 
         } catch (IOException e) {
             ;
@@ -157,8 +191,10 @@
             log.error(sm.getString("coyoteAdapter.service"), t);
         } finally {
             // Recycle the wrapper request and response
-            request.recycle();
-            response.recycle();
+            if (!comet) {
+                request.recycle();
+                response.recycle();
+            }
         }
 
     }

Modified: tomcat/tc6.0.x/trunk/java/org/apache/catalina/connector/Request.java
URL: 
http://svn.apache.org/viewcvs/tomcat/tc6.0.x/trunk/java/org/apache/catalina/connector/Request.java?rev=407241&r1=407240&r2=407241&view=diff
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/catalina/connector/Request.java 
(original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/connector/Request.java Wed 
May 17 05:55:39 2006
@@ -1294,6 +1294,12 @@
         if (readOnlyAttributes.containsKey(name)) {
             return;
         }
+
+        // Pass special attributes to the native layer
+        if (name.startsWith("org.apache.tomcat.")) {
+            coyoteRequest.getAttributes().remove(name);
+        }
+
         found = attributes.containsKey(name);
         if (found) {
             value = attributes.get(name);
@@ -1301,7 +1307,7 @@
         } else {
             return;
         }
-
+        
         // Notify interested application event listeners
         Object listeners[] = context.getApplicationEventListeners();
         if ((listeners == null) || (listeners.length == 0))

Added: tomcat/tc6.0.x/trunk/java/org/apache/catalina/servlets/CometServlet.java
URL: 
http://svn.apache.org/viewcvs/tomcat/tc6.0.x/trunk/java/org/apache/catalina/servlets/CometServlet.java?rev=407241&view=auto
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/catalina/servlets/CometServlet.java 
(added)
+++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/servlets/CometServlet.java 
Wed May 17 05:55:39 2006
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2006 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.catalina.servlets;
+
+
+import java.io.IOException;
+
+import javax.servlet.ServletException;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.catalina.CometProcessor;
+
+
+/**
+ * Helper class to implement Comet functionality.
+ */
+public abstract class CometServlet
+    extends HttpServlet implements CometProcessor {
+
+    public void begin(HttpServletRequest request, HttpServletResponse response)
+        throws IOException, ServletException {
+        
+    }
+    
+    public void end(HttpServletRequest request, HttpServletResponse response)
+        throws IOException, ServletException {
+        request.removeAttribute("org.apache.tomcat.comet");
+    }
+    
+    public void error(HttpServletRequest request, HttpServletResponse response)
+        throws IOException, ServletException {
+        end(request, response);
+    }
+    
+    public abstract void read(HttpServletRequest request, HttpServletResponse 
response)
+        throws IOException, ServletException;
+
+    protected void service(HttpServletRequest request, HttpServletResponse 
response)
+        throws IOException, ServletException {
+        
+        if (request.getAttribute("org.apache.tomcat.comet.support") == 
Boolean.TRUE) {
+            begin(request, response);
+        } else {
+            // FIXME: Implement without comet support
+            begin(request, response);
+            
+            // Loop reading data
+            
+            end(request, response);
+        }
+        
+    }
+
+}

Propchange: 
tomcat/tc6.0.x/trunk/java/org/apache/catalina/servlets/CometServlet.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: tomcat/tc6.0.x/trunk/java/org/apache/coyote/ajp/AjpAprProtocol.java
URL: 
http://svn.apache.org/viewcvs/tomcat/tc6.0.x/trunk/java/org/apache/coyote/ajp/AjpAprProtocol.java?rev=407241&r1=407240&r2=407241&view=diff
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/coyote/ajp/AjpAprProtocol.java 
(original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/coyote/ajp/AjpAprProtocol.java Wed May 
17 05:55:39 2006
@@ -35,6 +35,7 @@
 import org.apache.tomcat.util.modeler.Registry;
 import org.apache.tomcat.util.net.AprEndpoint;
 import org.apache.tomcat.util.net.AprEndpoint.Handler;
+import org.apache.tomcat.util.net.AprEndpoint.Handler.SocketState;
 import org.apache.tomcat.util.res.StringManager;
 
 
@@ -429,7 +430,12 @@
             this.proto = proto;
         }
 
-        public boolean process(long socket) {
+        // FIXME: Support for this could be added in AJP as well
+        public SocketState event(long socket, boolean error) {
+            return SocketState.CLOSED;
+        }
+        
+        public SocketState process(long socket) {
             AjpAprProcessor processor = null;
             try {
                 processor = (AjpAprProcessor) localProcessor.get();
@@ -460,7 +466,11 @@
                     ((ActionHook) processor).action(ActionCode.ACTION_START, 
null);
                 }
 
-                return processor.process(socket);
+                if (processor.process(socket)) {
+                    return SocketState.OPEN;
+                } else {
+                    return SocketState.CLOSED;
+                }
 
             } catch(java.net.SocketException e) {
                 // SocketExceptions are normal
@@ -487,7 +497,7 @@
                     ((ActionHook) processor).action(ActionCode.ACTION_STOP, 
null);
                 }
             }
-            return false;
+            return SocketState.CLOSED;
         }
     }
 

Modified: 
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11AprProcessor.java
URL: 
http://svn.apache.org/viewcvs/tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11AprProcessor.java?rev=407241&r1=407240&r2=407241&view=diff
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11AprProcessor.java 
(original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11AprProcessor.java 
Wed May 17 05:55:39 2006
@@ -52,6 +52,7 @@
 import org.apache.tomcat.util.http.FastHttpDateFormat;
 import org.apache.tomcat.util.http.MimeHeaders;
 import org.apache.tomcat.util.net.AprEndpoint;
+import org.apache.tomcat.util.net.AprEndpoint.Handler.SocketState;
 import org.apache.tomcat.util.res.StringManager;
 
 
@@ -147,12 +148,6 @@
 
 
     /**
-     * State flag.
-     */
-    protected boolean started = false;
-
-
-    /**
      * Error flag.
      */
     protected boolean error = false;
@@ -183,6 +178,12 @@
 
 
     /**
+     * Comet used.
+     */
+    protected boolean comet = false;
+
+
+    /**
      * Content delimitator for the request (if false, the connection will
      * be closed at the end of the request).
      */
@@ -735,7 +736,53 @@
      *
      * @throws IOException error during an I/O operation
      */
-    public boolean process(long socket)
+    public SocketState event(boolean error)
+        throws IOException {
+        
+        RequestInfo rp = request.getRequestProcessor();
+        
+        try {
+            rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
+            if (error) {
+                request.setAttribute("org.apache.tomcat.comet.error", 
Boolean.TRUE);
+            }
+            // FIXME: It is also possible to add a new "event" method in the 
adapter
+            // or something similar
+            adapter.service(request, response);
+            if (request.getAttribute("org.apache.tomcat.comet") == null) {
+                comet = false;
+                endpoint.getCometPoller().remove(socket);
+            }
+        } catch (InterruptedIOException e) {
+            error = true;
+        } catch (Throwable t) {
+            log.error(sm.getString("http11processor.request.process"), t);
+            // 500 - Internal Server Error
+            response.setStatus(500);
+            error = true;
+        }
+        
+        rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
+
+        if (error) {
+            recycle();
+            return SocketState.CLOSED;
+        } else if (!comet) {
+            recycle();
+            endpoint.getPoller().add(socket);
+            return SocketState.OPEN;
+        } else {
+            return SocketState.LONG;
+        }
+    }
+    
+    /**
+     * Process pipelined HTTP requests using the specified input and output
+     * streams.
+     *
+     * @throws IOException error during an I/O operation
+     */
+    public SocketState process(long socket)
         throws IOException {
         RequestInfo rp = request.getRequestProcessor();
         rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);
@@ -768,7 +815,7 @@
         boolean keptAlive = false;
         boolean openSocket = false;
 
-        while (started && !error && keepAlive) {
+        while (!error && keepAlive) {
 
             // Parsing the request header
             try {
@@ -833,7 +880,10 @@
                         error = response.getErrorException() != null ||
                                 statusDropsConnection(response.getStatus());
                     }
-
+                    // Comet support
+                    if (request.getAttribute("org.apache.tomcat.comet") != 
null) {
+                        comet = true;
+                    }
                 } catch (InterruptedIOException e) {
                     error = true;
                 } catch (Throwable t) {
@@ -845,25 +895,8 @@
             }
 
             // Finish the handling of the request
-            try {
-                rp.setStage(org.apache.coyote.Constants.STAGE_ENDINPUT);
-                inputBuffer.endRequest();
-            } catch (IOException e) {
-                error = true;
-            } catch (Throwable t) {
-                log.error(sm.getString("http11processor.request.finish"), t);
-                // 500 - Internal Server Error
-                response.setStatus(500);
-                error = true;
-            }
-            try {
-                rp.setStage(org.apache.coyote.Constants.STAGE_ENDOUTPUT);
-                outputBuffer.endRequest();
-            } catch (IOException e) {
-                error = true;
-            } catch (Throwable t) {
-                log.error(sm.getString("http11processor.response.finish"), t);
-                error = true;
+            if (!comet) {
+                endRequest();
             }
 
             // If there was an error, make sure the request is counted as
@@ -873,17 +906,8 @@
             }
             request.updateCounters();
 
-            rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE);
-
-            // Don't reset the param - we'll see it as ended. Next request
-            // will reset it
-            // thrA.setParam(null);
-            // Next request
-            inputBuffer.nextRequest();
-            outputBuffer.nextRequest();
-
             // Do sendfile as needed: add socket to sendfile and end
-            if (sendfileData != null) {
+            if (sendfileData != null && !error) {
                 sendfileData.socket = socket;
                 sendfileData.keepAlive = keepAlive;
                 if (!endpoint.getSendfile().add(sendfileData)) {
@@ -892,19 +916,63 @@
                 }
             }
             
+            rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE);
+
         }
 
         rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
 
-        // Recycle
+        if (comet) {
+            if (error) {
+                recycle();
+                return SocketState.CLOSED;
+            } else {
+                endpoint.getCometPoller().add(socket);
+                return SocketState.LONG;
+            }
+        } else {
+            recycle();
+            return (openSocket) ? SocketState.OPEN : SocketState.CLOSED;
+        }
+        
+    }
+
+    
+    public void endRequest() {
+        
+        // Finish the handling of the request
+        try {
+            inputBuffer.endRequest();
+        } catch (IOException e) {
+            error = true;
+        } catch (Throwable t) {
+            log.error(sm.getString("http11processor.request.finish"), t);
+            // 500 - Internal Server Error
+            response.setStatus(500);
+            error = true;
+        }
+        try {
+            outputBuffer.endRequest();
+        } catch (IOException e) {
+            error = true;
+        } catch (Throwable t) {
+            log.error(sm.getString("http11processor.response.finish"), t);
+            error = true;
+        }
+
+        // Next request
+        inputBuffer.nextRequest();
+        outputBuffer.nextRequest();
+        
+    }
+    
+    
+    public void recycle() {
         inputBuffer.recycle();
         outputBuffer.recycle();
         this.socket = 0;
-
-        return openSocket;
-        
     }
-
+    
 
     // ----------------------------------------------------- ActionHook Methods
 
@@ -966,6 +1034,7 @@
             // End the processing of the current request, and stop any further
             // transactions with the client
 
+            comet = false;
             try {
                 outputBuffer.endRequest();
             } catch (IOException e) {
@@ -985,14 +1054,6 @@
 
             // Do nothing
 
-        } else if (actionCode == ActionCode.ACTION_START) {
-
-            started = true;
-
-        } else if (actionCode == ActionCode.ACTION_STOP) {
-
-            started = false;
-
         } else if (actionCode == ActionCode.ACTION_REQ_HOST_ADDR_ATTRIBUTE) {
 
             // Get remote host address
@@ -1368,6 +1429,8 @@
         if (endpoint.getUseSendfile()) {
             request.setAttribute("org.apache.tomcat.sendfile.support", 
Boolean.TRUE);
         }
+        // Advertise comet support through a request attribute
+        request.setAttribute("org.apache.tomcat.comet.support", Boolean.TRUE);
         
     }
 

Modified: 
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11AprProtocol.java
URL: 
http://svn.apache.org/viewcvs/tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11AprProtocol.java?rev=407241&r1=407240&r2=407241&view=diff
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11AprProtocol.java 
(original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11AprProtocol.java 
Wed May 17 05:55:39 2006
@@ -20,6 +20,7 @@
 import java.net.URLEncoder;
 import java.util.Hashtable;
 import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 
 import javax.management.MBeanRegistration;
@@ -598,20 +599,73 @@
     // --------------------  Connection handler --------------------
 
     static class Http11ConnectionHandler implements Handler {
-        Http11AprProtocol proto;
-        static int count=0;
-        RequestGroupInfo global=new RequestGroupInfo();
-        ThreadLocal localProcessor = new ThreadLocal();
+        
+        protected Http11AprProtocol proto;
+        protected static int count = 0;
+        protected RequestGroupInfo global = new RequestGroupInfo();
+        
+        protected ThreadLocal<Http11AprProcessor> localProcessor = 
+            new ThreadLocal<Http11AprProcessor>();
+        protected ConcurrentHashMap<Long, Http11AprProcessor> connections =
+            new ConcurrentHashMap<Long, Http11AprProcessor>();
+        protected java.util.Stack<Http11AprProcessor> recycledProcessors = 
+            new java.util.Stack<Http11AprProcessor>();
 
-        Http11ConnectionHandler( Http11AprProtocol proto ) {
-            this.proto=proto;
+        Http11ConnectionHandler(Http11AprProtocol proto) {
+            this.proto = proto;
         }
 
-        public boolean process(long socket) {
+        public SocketState event(long socket, boolean error) {
+            Http11AprProcessor result = connections.get(socket);
+            SocketState state = SocketState.CLOSED; 
+            if (result != null) {
+                boolean recycle = error;
+                // Call the appropriate event
+                try {
+                    state = result.event(error);
+                } catch (java.net.SocketException e) {
+                    // SocketExceptions are normal
+                    Http11AprProtocol.log.debug
+                        (sm.getString
+                            ("http11protocol.proto.socketexception.debug"), e);
+                } catch (java.io.IOException e) {
+                    // IOExceptions are normal
+                    Http11AprProtocol.log.debug
+                        (sm.getString
+                            ("http11protocol.proto.ioexception.debug"), e);
+                }
+                // Future developers: if you discover any other
+                // rare-but-nonfatal exceptions, catch them here, and log as
+                // above.
+                catch (Throwable e) {
+                    // any other exception or error is odd. Here we log it
+                    // with "ERROR" level, so it will show up even on
+                    // less-than-verbose logs.
+                    Http11AprProtocol.log.error
+                        (sm.getString("http11protocol.proto.error"), e);
+                } finally {
+                    if (state != SocketState.LONG) {
+                        connections.remove(socket);
+                        recycledProcessors.push(result);
+                    }
+                }
+            }
+            return state;
+        }
+        
+        public SocketState process(long socket) {
             Http11AprProcessor processor = null;
             try {
                 processor = (Http11AprProcessor) localProcessor.get();
                 if (processor == null) {
+                    synchronized (recycledProcessors) {
+                        if (!recycledProcessors.isEmpty()) {
+                            processor = recycledProcessors.pop();
+                            localProcessor.set(processor);
+                        }
+                    }
+                }
+                if (processor == null) {
                     processor =
                         new Http11AprProcessor(proto.maxHttpHeaderSize, 
proto.ep);
                     processor.setAdapter(proto.adapter);
@@ -647,7 +701,15 @@
                     ((ActionHook) processor).action(ActionCode.ACTION_START, 
null);
                 }
 
-                return processor.process(socket);
+                SocketState state = processor.process(socket);
+                if (state == SocketState.LONG) {
+                    // Associate the connection with the processor. The next 
request 
+                    // processed by this thread will use either a new or a 
recycled
+                    // processor.
+                    connections.put(socket, processor);
+                    localProcessor.set(null);
+                }
+                return state;
 
             } catch(java.net.SocketException e) {
                 // SocketExceptions are normal
@@ -669,15 +731,8 @@
                 // less-than-verbose logs.
                 Http11AprProtocol.log.error
                     (sm.getString("http11protocol.proto.error"), e);
-            } finally {
-                //       if(proto.adapter != null) proto.adapter.recycle();
-                //                processor.recycle();
-
-                if (processor instanceof ActionHook) {
-                    ((ActionHook) processor).action(ActionCode.ACTION_STOP, 
null);
-                }
             }
-            return false;
+            return SocketState.CLOSED;
         }
     }
 

Modified: 
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalAprInputBuffer.java
URL: 
http://svn.apache.org/viewcvs/tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalAprInputBuffer.java?rev=407241&r1=407240&r2=407241&view=diff
==============================================================================
--- 
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalAprInputBuffer.java 
(original)
+++ 
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalAprInputBuffer.java 
Wed May 17 05:55:39 2006
@@ -329,8 +329,7 @@
      * consumed. This method only resets all the pointers so that we are ready
      * to parse the next HTTP request.
      */
-    public void nextRequest()
-        throws IOException {
+    public void nextRequest() {
 
         // Recycle Request object
         request.recycle();

Modified: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
URL: 
http://svn.apache.org/viewcvs/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java?rev=407241&r1=407240&r2=407241&view=diff
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java 
(original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java Wed 
May 17 05:55:39 2006
@@ -300,6 +300,14 @@
 
 
     /**
+     * Allow comet request handling.
+     */
+    protected boolean useComet = true;
+    public void setUseComet(boolean useComet) { this.useComet = useComet; }
+    public boolean getUseComet() { return useComet; }
+
+
+    /**
      * Acceptor thread count.
      */
     protected int acceptorThreadCount = 0;
@@ -335,6 +343,17 @@
 
 
     /**
+     * The socket poller used for Comet support.
+     */
+    protected Poller[] cometPollers = null;
+    protected int cometPollerRoundRobin = 0;
+    public Poller getCometPoller() {
+        cometPollerRoundRobin = (cometPollerRoundRobin + 1) % 
cometPollers.length;
+        return cometPollers[cometPollerRoundRobin];
+    }
+
+
+    /**
      * The static file sender.
      */
     protected Sendfile[] sendfiles = null;
@@ -561,11 +580,8 @@
             addressStr = address.getHostAddress();
         }
         int family = Socket.APR_INET;
-        if (Library.APR_HAVE_IPV6) {
-            if (addressStr == null)
-                family = Socket.APR_UNSPEC;
-            else if (addressStr.indexOf(':') >= 0)
-                family = Socket.APR_UNSPEC;
+        if (Library.APR_HAVE_IPV6 && (addressStr == null || 
addressStr.indexOf(':') >= 0)) {
+            family = Socket.APR_UNSPEC;
         }
         long inetAddress = Address.info(addressStr, family,
                 port, 0, rootPool);
@@ -712,7 +728,7 @@
             // Start poller threads
             pollers = new Poller[pollerThreadCount];
             for (int i = 0; i < pollerThreadCount; i++) {
-                pollers[i] = new Poller();
+                pollers[i] = new Poller(false);
                 pollers[i].init();
                 Thread pollerThread = new Thread(pollers[i], getName() + 
"-Poller-" + i);
                 pollerThread.setPriority(threadPriority);
@@ -720,6 +736,17 @@
                 pollerThread.start();
             }
 
+            // Start comet poller threads
+            cometPollers = new Poller[pollerThreadCount];
+            for (int i = 0; i < pollerThreadCount; i++) {
+                cometPollers[i] = new Poller(true);
+                cometPollers[i].init();
+                Thread pollerThread = new Thread(cometPollers[i], getName() + 
"-CometPoller-" + i);
+                pollerThread.setPriority(threadPriority);
+                pollerThread.setDaemon(true);
+                pollerThread.start();
+            }
+
             // Start sendfile threads
             if (useSendfile) {
                 sendfiles = new Sendfile[sendfileThreadCount];
@@ -998,6 +1025,26 @@
     }
     
 
+    /**
+     * Process given socket for an event.
+     */
+    protected boolean processSocket(long socket, boolean error) {
+        try {
+            if (executor == null) {
+                getWorkerThread().assign(socket, error);
+            } else {
+                executor.execute(new SocketEventProcessor(socket, error));
+            }
+        } catch (Throwable t) {
+            // This means we got an OOM or similar creating a thread, or that
+            // the pool and its queue are full
+            log.error(sm.getString("endpoint.process.fail"), t);
+            return false;
+        }
+        return true;
+    }
+    
+
     // --------------------------------------------------- Acceptor Inner Class
 
 
@@ -1060,10 +1107,18 @@
 
         protected long[] addS;
         protected int addCount = 0;
+        protected long[] removeS;
+        protected int removeCount = 0;
         
+        protected boolean comet = true;
+
         protected int keepAliveCount = 0;
         public int getKeepAliveCount() { return keepAliveCount; }
 
+        public Poller(boolean comet) {
+            this.comet = comet;
+        }
+        
         /**
          * Create the poller. With some versions of APR, the maximum poller 
size will
          * be 62 (reocmpiling APR is necessary to remove this limitation).
@@ -1071,19 +1126,29 @@
         protected void init() {
             pool = Pool.create(serverSockPool);
             int size = pollerSize / pollerThreadCount;
-            serverPollset = allocatePoller(size, pool, soTimeout);
+            int timeout = soTimeout;
+            if (comet) {
+                // FIXME: Find an appropriate timeout value, for now, "longer 
than usual"
+                // semms appropriate
+                timeout = soTimeout * 20;
+            }
+            serverPollset = allocatePoller(size, pool, timeout);
             if (serverPollset == 0 && size > 1024) {
                 size = 1024;
-                serverPollset = allocatePoller(size, pool, soTimeout);
+                serverPollset = allocatePoller(size, pool, timeout);
             }
             if (serverPollset == 0) {
                 size = 62;
-                serverPollset = allocatePoller(size, pool, soTimeout);
+                serverPollset = allocatePoller(size, pool, timeout);
             }
             desc = new long[size * 2];
             keepAliveCount = 0;
             addS = new long[size];
             addCount = 0;
+            if (comet) {
+                removeS = new long[size];
+            }
+            removeCount = 0;
         }
 
         /**
@@ -1092,18 +1157,32 @@
         protected void destroy() {
             // Close all sockets in the add queue
             for (int i = 0; i < addCount; i++) {
+                if (comet) {
+                    processSocket(addS[i], true);
+                }
                 Socket.destroy(addS[i]);
             }
+            // Close all sockets in the remove queue
+            for (int i = 0; i < removeCount; i++) {
+                if (comet) {
+                    processSocket(removeS[i], true);
+                }
+                Socket.destroy(removeS[i]);
+            }
             // Close all sockets still in the poller
             int rv = Poll.pollset(serverPollset, desc);
             if (rv > 0) {
                 for (int n = 0; n < rv; n++) {
+                    if (comet) {
+                        processSocket(desc[n*2+1], true);
+                    }
                     Socket.destroy(desc[n*2+1]);
                 }
             }
             Pool.destroy(pool);
             keepAliveCount = 0;
             addCount = 0;
+            removeCount = 0;
         }
 
         /**
@@ -1120,6 +1199,9 @@
                 // at most for pollTime before being polled
                 if (addCount >= addS.length) {
                     // Can't do anything: close the socket right away
+                    if (comet) {
+                        processSocket(socket, true);
+                    }
                     Socket.destroy(socket);
                     return;
                 }
@@ -1130,6 +1212,30 @@
         }
 
         /**
+         * Remove specified socket and associated pool from the poller. The 
socket will
+         * be added to a temporary array, and polled first after a maximum 
amount
+         * of time equal to pollTime (in most cases, latency will be much 
lower,
+         * however). Note that this is automatic, except if the poller is used 
for
+         * comet.
+         *
+         * @param socket to remove from the poller
+         */
+        public void remove(long socket) {
+            synchronized (this) {
+                // Add socket to the list. Newly added sockets will wait
+                // at most for pollTime before being polled
+                if (removeCount >= removeS.length) {
+                    // Normally, it cannot happen ...
+                    Socket.destroy(socket);
+                    return;
+                }
+                removeS[removeCount] = socket;
+                removeCount++;
+                this.notify();
+            }
+        }
+
+        /**
          * The background thread that listens for incoming TCP/IP connections 
and
          * hands them off to an appropriate processor.
          */
@@ -1171,23 +1277,41 @@
                                     keepAliveCount++;
                                 } else {
                                     // Can't do anything: close the socket 
right away
+                                    if (comet) {
+                                        processSocket(addS[i], true);
+                                    }
                                     Socket.destroy(addS[i]);
                                 }
                             }
                             addCount = 0;
                         }
                     }
+                    // Remove sockets which are waiting to the poller
+                    if (removeCount > 0) {
+                        synchronized (this) {
+                            for (int i = 0; i < removeCount; i++) {
+                                int rv = Poll.remove(serverPollset, 
removeS[i]);
+                            }
+                            removeCount = 0;
+                        }
+                    }
+
                     maintainTime += pollTime;
                     // Pool for the specified interval
-                    int rv = Poll.poll(serverPollset, pollTime, desc, true);
+                    int rv = Poll.poll(serverPollset, pollTime, desc, !comet);
                     if (rv > 0) {
                         keepAliveCount -= rv;
                         for (int n = 0; n < rv; n++) {
                             // Check for failed sockets and hand this socket 
off to a worker
                             if (((desc[n*2] & Poll.APR_POLLHUP) == 
Poll.APR_POLLHUP)
                                     || ((desc[n*2] & Poll.APR_POLLERR) == 
Poll.APR_POLLERR)
+                                    || (comet && (!processSocket(desc[n*2+1], 
false))) 
                                     || (!processSocket(desc[n*2+1]))) {
                                 // Close socket and clear pool
+                                if (comet) {
+                                    processSocket(desc[n*2+1], true);
+                                    Poll.remove(serverPollset, desc[n*2+1]);
+                                }
                                 Socket.destroy(desc[n*2+1]);
                                 continue;
                             }
@@ -1215,6 +1339,11 @@
                             keepAliveCount -= rv;
                             for (int n = 0; n < rv; n++) {
                                 // Close socket and clear pool
+                                if (comet) {
+                                    // FIXME: should really close in case of 
timeout ?
+                                    // FIXME: maybe comet should use an 
extended timeout
+                                    processSocket(desc[n], true);
+                                }
                                 Socket.destroy(desc[n]);
                             }
                         }
@@ -1242,6 +1371,8 @@
         protected Thread thread = null;
         protected boolean available = false;
         protected long socket = 0;
+        protected boolean event = false;
+        protected boolean error = false;
 
 
         /**
@@ -1265,6 +1396,28 @@
 
             // Store the newly available Socket and notify our thread
             this.socket = socket;
+            event = false;
+            error = false;
+            available = true;
+            notifyAll();
+
+        }
+
+
+        protected synchronized void assign(long socket, boolean error) {
+
+            // Wait for the Processor to get the previous Socket
+            while (available) {
+                try {
+                    wait();
+                } catch (InterruptedException e) {
+                }
+            }
+
+            // Store the newly available Socket and notify our thread
+            this.socket = socket;
+            event = true;
+            this.error = error;
             available = true;
             notifyAll();
 
@@ -1310,7 +1463,11 @@
                     continue;
 
                 // Process the request from this socket
-                if (!handler.process(socket)) {
+                if ((event) && (handler.event(socket, error) == 
Handler.SocketState.CLOSED)) {
+                    // Close socket and pool
+                    Socket.destroy(socket);
+                    socket = 0;
+                } else if (handler.process(socket) == 
Handler.SocketState.CLOSED) {
                     // Close socket and pool
                     Socket.destroy(socket);
                     socket = 0;
@@ -1622,7 +1779,11 @@
      * thread local fields.
      */
     public interface Handler {
-        public boolean process(long socket);
+        public enum SocketState {
+            OPEN, CLOSED, LONG
+        }
+        public SocketState process(long socket);
+        public SocketState event(long socket, boolean error);
     }
 
 
@@ -1700,7 +1861,38 @@
         public void run() {
 
             // Process the request from this socket
-            if (!handler.process(socket)) {
+            if (handler.process(socket) == Handler.SocketState.CLOSED) {
+                // Close socket and pool
+                Socket.destroy(socket);
+                socket = 0;
+            }
+
+        }
+        
+    }
+    
+    
+    // --------------------------------------- SocketEventProcessor Inner Class
+
+
+    /**
+     * This class is the equivalent of the Worker, but will simply use in an
+     * external Executor thread pool.
+     */
+    protected class SocketEventProcessor implements Runnable {
+        
+        protected long socket = 0;
+        protected boolean error = false; 
+        
+        public SocketEventProcessor(long socket, boolean error) {
+            this.socket = socket;
+            this.error = error;
+        }
+
+        public void run() {
+
+            // Process the request from this socket
+            if (handler.event(socket, error) == Handler.SocketState.CLOSED) {
                 // Close socket and pool
                 Socket.destroy(socket);
                 socket = 0;



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to