Author: remm Date: Wed May 17 05:55:39 2006 New Revision: 407241 URL: http://svn.apache.org/viewcvs?rev=407241&view=rev Log: - Start work on comet support. Note: it doesn't work yet, I think (I didn't test), and most of this is very preliminary. It is relatively straightforward, though.
Added: tomcat/tc6.0.x/trunk/java/org/apache/catalina/CometProcessor.java (with props) tomcat/tc6.0.x/trunk/java/org/apache/catalina/servlets/CometServlet.java (with props) Modified: tomcat/tc6.0.x/trunk/java/org/apache/catalina/connector/CoyoteAdapter.java tomcat/tc6.0.x/trunk/java/org/apache/catalina/connector/Request.java tomcat/tc6.0.x/trunk/java/org/apache/coyote/ajp/AjpAprProtocol.java tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11AprProcessor.java tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11AprProtocol.java tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalAprInputBuffer.java tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java Added: tomcat/tc6.0.x/trunk/java/org/apache/catalina/CometProcessor.java URL: http://svn.apache.org/viewcvs/tomcat/tc6.0.x/trunk/java/org/apache/catalina/CometProcessor.java?rev=407241&view=auto ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/catalina/CometProcessor.java (added) +++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/CometProcessor.java Wed May 17 05:55:39 2006 @@ -0,0 +1,21 @@ +package org.apache.catalina; + +import java.io.IOException; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +public interface CometProcessor { + + public void begin(HttpServletRequest request, HttpServletResponse response) + throws IOException, ServletException; + public void end(HttpServletRequest request, HttpServletResponse response) + throws IOException, ServletException; + + public void error(HttpServletRequest request, HttpServletResponse response) + throws IOException, ServletException; + public void read(HttpServletRequest request, HttpServletResponse response) + throws IOException, ServletException; + +} Propchange: tomcat/tc6.0.x/trunk/java/org/apache/catalina/CometProcessor.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: tomcat/tc6.0.x/trunk/java/org/apache/catalina/connector/CoyoteAdapter.java URL: http://svn.apache.org/viewcvs/tomcat/tc6.0.x/trunk/java/org/apache/catalina/connector/CoyoteAdapter.java?rev=407241&r1=407240&r2=407241&view=diff ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/catalina/connector/CoyoteAdapter.java (original) +++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/connector/CoyoteAdapter.java Wed May 17 05:55:39 2006 @@ -19,6 +19,7 @@ import java.io.IOException; +import org.apache.catalina.CometProcessor; import org.apache.catalina.Context; import org.apache.catalina.Globals; import org.apache.catalina.Wrapper; @@ -135,10 +136,35 @@ } + // Comet processing + if (request.getWrapper() != null + && request.getWrapper() instanceof CometProcessor) { + try { + if (request.getAttribute("org.apache.tomcat.comet.error") != null) { + ((CometProcessor) request.getWrapper()).error(request.getRequest(), response.getResponse()); + } else { + ((CometProcessor) request.getWrapper()).read(request.getRequest(), response.getResponse()); + } + } catch (IOException e) { + ; + } catch (Throwable t) { + log.error(sm.getString("coyoteAdapter.service"), t); + } finally { + // Recycle the wrapper request and response + if (request.getAttribute("org.apache.tomcat.comet") == null) { + request.recycle(); + response.recycle(); + } + } + return; + } + if (connector.getXpoweredBy()) { response.addHeader("X-Powered-By", "Servlet/2.5"); } + boolean comet = false; + try { // Parse and set Catalina and configuration specific @@ -148,8 +174,16 @@ connector.getContainer().getPipeline().getFirst().invoke(request, response); } - response.finishResponse(); - req.action( ActionCode.ACTION_POST_REQUEST , null); + if (request.getAttribute("org.apache.tomcat.comet.support") == Boolean.TRUE + && request.getWrapper() instanceof CometProcessor) { + request.setAttribute("org.apache.tomcat.comet", Boolean.TRUE); + comet = true; + } + + if (!comet) { + response.finishResponse(); + req.action( ActionCode.ACTION_POST_REQUEST , null); + } } catch (IOException e) { ; @@ -157,8 +191,10 @@ log.error(sm.getString("coyoteAdapter.service"), t); } finally { // Recycle the wrapper request and response - request.recycle(); - response.recycle(); + if (!comet) { + request.recycle(); + response.recycle(); + } } } Modified: tomcat/tc6.0.x/trunk/java/org/apache/catalina/connector/Request.java URL: http://svn.apache.org/viewcvs/tomcat/tc6.0.x/trunk/java/org/apache/catalina/connector/Request.java?rev=407241&r1=407240&r2=407241&view=diff ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/catalina/connector/Request.java (original) +++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/connector/Request.java Wed May 17 05:55:39 2006 @@ -1294,6 +1294,12 @@ if (readOnlyAttributes.containsKey(name)) { return; } + + // Pass special attributes to the native layer + if (name.startsWith("org.apache.tomcat.")) { + coyoteRequest.getAttributes().remove(name); + } + found = attributes.containsKey(name); if (found) { value = attributes.get(name); @@ -1301,7 +1307,7 @@ } else { return; } - + // Notify interested application event listeners Object listeners[] = context.getApplicationEventListeners(); if ((listeners == null) || (listeners.length == 0)) Added: tomcat/tc6.0.x/trunk/java/org/apache/catalina/servlets/CometServlet.java URL: http://svn.apache.org/viewcvs/tomcat/tc6.0.x/trunk/java/org/apache/catalina/servlets/CometServlet.java?rev=407241&view=auto ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/catalina/servlets/CometServlet.java (added) +++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/servlets/CometServlet.java Wed May 17 05:55:39 2006 @@ -0,0 +1,73 @@ +/* + * Copyright 2006 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.catalina.servlets; + + +import java.io.IOException; + +import javax.servlet.ServletException; +import javax.servlet.ServletRequest; +import javax.servlet.ServletResponse; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.apache.catalina.CometProcessor; + + +/** + * Helper class to implement Comet functionality. + */ +public abstract class CometServlet + extends HttpServlet implements CometProcessor { + + public void begin(HttpServletRequest request, HttpServletResponse response) + throws IOException, ServletException { + + } + + public void end(HttpServletRequest request, HttpServletResponse response) + throws IOException, ServletException { + request.removeAttribute("org.apache.tomcat.comet"); + } + + public void error(HttpServletRequest request, HttpServletResponse response) + throws IOException, ServletException { + end(request, response); + } + + public abstract void read(HttpServletRequest request, HttpServletResponse response) + throws IOException, ServletException; + + protected void service(HttpServletRequest request, HttpServletResponse response) + throws IOException, ServletException { + + if (request.getAttribute("org.apache.tomcat.comet.support") == Boolean.TRUE) { + begin(request, response); + } else { + // FIXME: Implement without comet support + begin(request, response); + + // Loop reading data + + end(request, response); + } + + } + +} Propchange: tomcat/tc6.0.x/trunk/java/org/apache/catalina/servlets/CometServlet.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: tomcat/tc6.0.x/trunk/java/org/apache/coyote/ajp/AjpAprProtocol.java URL: http://svn.apache.org/viewcvs/tomcat/tc6.0.x/trunk/java/org/apache/coyote/ajp/AjpAprProtocol.java?rev=407241&r1=407240&r2=407241&view=diff ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/coyote/ajp/AjpAprProtocol.java (original) +++ tomcat/tc6.0.x/trunk/java/org/apache/coyote/ajp/AjpAprProtocol.java Wed May 17 05:55:39 2006 @@ -35,6 +35,7 @@ import org.apache.tomcat.util.modeler.Registry; import org.apache.tomcat.util.net.AprEndpoint; import org.apache.tomcat.util.net.AprEndpoint.Handler; +import org.apache.tomcat.util.net.AprEndpoint.Handler.SocketState; import org.apache.tomcat.util.res.StringManager; @@ -429,7 +430,12 @@ this.proto = proto; } - public boolean process(long socket) { + // FIXME: Support for this could be added in AJP as well + public SocketState event(long socket, boolean error) { + return SocketState.CLOSED; + } + + public SocketState process(long socket) { AjpAprProcessor processor = null; try { processor = (AjpAprProcessor) localProcessor.get(); @@ -460,7 +466,11 @@ ((ActionHook) processor).action(ActionCode.ACTION_START, null); } - return processor.process(socket); + if (processor.process(socket)) { + return SocketState.OPEN; + } else { + return SocketState.CLOSED; + } } catch(java.net.SocketException e) { // SocketExceptions are normal @@ -487,7 +497,7 @@ ((ActionHook) processor).action(ActionCode.ACTION_STOP, null); } } - return false; + return SocketState.CLOSED; } } Modified: tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11AprProcessor.java URL: http://svn.apache.org/viewcvs/tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11AprProcessor.java?rev=407241&r1=407240&r2=407241&view=diff ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11AprProcessor.java (original) +++ tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11AprProcessor.java Wed May 17 05:55:39 2006 @@ -52,6 +52,7 @@ import org.apache.tomcat.util.http.FastHttpDateFormat; import org.apache.tomcat.util.http.MimeHeaders; import org.apache.tomcat.util.net.AprEndpoint; +import org.apache.tomcat.util.net.AprEndpoint.Handler.SocketState; import org.apache.tomcat.util.res.StringManager; @@ -147,12 +148,6 @@ /** - * State flag. - */ - protected boolean started = false; - - - /** * Error flag. */ protected boolean error = false; @@ -183,6 +178,12 @@ /** + * Comet used. + */ + protected boolean comet = false; + + + /** * Content delimitator for the request (if false, the connection will * be closed at the end of the request). */ @@ -735,7 +736,53 @@ * * @throws IOException error during an I/O operation */ - public boolean process(long socket) + public SocketState event(boolean error) + throws IOException { + + RequestInfo rp = request.getRequestProcessor(); + + try { + rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE); + if (error) { + request.setAttribute("org.apache.tomcat.comet.error", Boolean.TRUE); + } + // FIXME: It is also possible to add a new "event" method in the adapter + // or something similar + adapter.service(request, response); + if (request.getAttribute("org.apache.tomcat.comet") == null) { + comet = false; + endpoint.getCometPoller().remove(socket); + } + } catch (InterruptedIOException e) { + error = true; + } catch (Throwable t) { + log.error(sm.getString("http11processor.request.process"), t); + // 500 - Internal Server Error + response.setStatus(500); + error = true; + } + + rp.setStage(org.apache.coyote.Constants.STAGE_ENDED); + + if (error) { + recycle(); + return SocketState.CLOSED; + } else if (!comet) { + recycle(); + endpoint.getPoller().add(socket); + return SocketState.OPEN; + } else { + return SocketState.LONG; + } + } + + /** + * Process pipelined HTTP requests using the specified input and output + * streams. + * + * @throws IOException error during an I/O operation + */ + public SocketState process(long socket) throws IOException { RequestInfo rp = request.getRequestProcessor(); rp.setStage(org.apache.coyote.Constants.STAGE_PARSE); @@ -768,7 +815,7 @@ boolean keptAlive = false; boolean openSocket = false; - while (started && !error && keepAlive) { + while (!error && keepAlive) { // Parsing the request header try { @@ -833,7 +880,10 @@ error = response.getErrorException() != null || statusDropsConnection(response.getStatus()); } - + // Comet support + if (request.getAttribute("org.apache.tomcat.comet") != null) { + comet = true; + } } catch (InterruptedIOException e) { error = true; } catch (Throwable t) { @@ -845,25 +895,8 @@ } // Finish the handling of the request - try { - rp.setStage(org.apache.coyote.Constants.STAGE_ENDINPUT); - inputBuffer.endRequest(); - } catch (IOException e) { - error = true; - } catch (Throwable t) { - log.error(sm.getString("http11processor.request.finish"), t); - // 500 - Internal Server Error - response.setStatus(500); - error = true; - } - try { - rp.setStage(org.apache.coyote.Constants.STAGE_ENDOUTPUT); - outputBuffer.endRequest(); - } catch (IOException e) { - error = true; - } catch (Throwable t) { - log.error(sm.getString("http11processor.response.finish"), t); - error = true; + if (!comet) { + endRequest(); } // If there was an error, make sure the request is counted as @@ -873,17 +906,8 @@ } request.updateCounters(); - rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE); - - // Don't reset the param - we'll see it as ended. Next request - // will reset it - // thrA.setParam(null); - // Next request - inputBuffer.nextRequest(); - outputBuffer.nextRequest(); - // Do sendfile as needed: add socket to sendfile and end - if (sendfileData != null) { + if (sendfileData != null && !error) { sendfileData.socket = socket; sendfileData.keepAlive = keepAlive; if (!endpoint.getSendfile().add(sendfileData)) { @@ -892,19 +916,63 @@ } } + rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE); + } rp.setStage(org.apache.coyote.Constants.STAGE_ENDED); - // Recycle + if (comet) { + if (error) { + recycle(); + return SocketState.CLOSED; + } else { + endpoint.getCometPoller().add(socket); + return SocketState.LONG; + } + } else { + recycle(); + return (openSocket) ? SocketState.OPEN : SocketState.CLOSED; + } + + } + + + public void endRequest() { + + // Finish the handling of the request + try { + inputBuffer.endRequest(); + } catch (IOException e) { + error = true; + } catch (Throwable t) { + log.error(sm.getString("http11processor.request.finish"), t); + // 500 - Internal Server Error + response.setStatus(500); + error = true; + } + try { + outputBuffer.endRequest(); + } catch (IOException e) { + error = true; + } catch (Throwable t) { + log.error(sm.getString("http11processor.response.finish"), t); + error = true; + } + + // Next request + inputBuffer.nextRequest(); + outputBuffer.nextRequest(); + + } + + + public void recycle() { inputBuffer.recycle(); outputBuffer.recycle(); this.socket = 0; - - return openSocket; - } - + // ----------------------------------------------------- ActionHook Methods @@ -966,6 +1034,7 @@ // End the processing of the current request, and stop any further // transactions with the client + comet = false; try { outputBuffer.endRequest(); } catch (IOException e) { @@ -985,14 +1054,6 @@ // Do nothing - } else if (actionCode == ActionCode.ACTION_START) { - - started = true; - - } else if (actionCode == ActionCode.ACTION_STOP) { - - started = false; - } else if (actionCode == ActionCode.ACTION_REQ_HOST_ADDR_ATTRIBUTE) { // Get remote host address @@ -1368,6 +1429,8 @@ if (endpoint.getUseSendfile()) { request.setAttribute("org.apache.tomcat.sendfile.support", Boolean.TRUE); } + // Advertise comet support through a request attribute + request.setAttribute("org.apache.tomcat.comet.support", Boolean.TRUE); } Modified: tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11AprProtocol.java URL: http://svn.apache.org/viewcvs/tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11AprProtocol.java?rev=407241&r1=407240&r2=407241&view=diff ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11AprProtocol.java (original) +++ tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11AprProtocol.java Wed May 17 05:55:39 2006 @@ -20,6 +20,7 @@ import java.net.URLEncoder; import java.util.Hashtable; import java.util.Iterator; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import javax.management.MBeanRegistration; @@ -598,20 +599,73 @@ // -------------------- Connection handler -------------------- static class Http11ConnectionHandler implements Handler { - Http11AprProtocol proto; - static int count=0; - RequestGroupInfo global=new RequestGroupInfo(); - ThreadLocal localProcessor = new ThreadLocal(); + + protected Http11AprProtocol proto; + protected static int count = 0; + protected RequestGroupInfo global = new RequestGroupInfo(); + + protected ThreadLocal<Http11AprProcessor> localProcessor = + new ThreadLocal<Http11AprProcessor>(); + protected ConcurrentHashMap<Long, Http11AprProcessor> connections = + new ConcurrentHashMap<Long, Http11AprProcessor>(); + protected java.util.Stack<Http11AprProcessor> recycledProcessors = + new java.util.Stack<Http11AprProcessor>(); - Http11ConnectionHandler( Http11AprProtocol proto ) { - this.proto=proto; + Http11ConnectionHandler(Http11AprProtocol proto) { + this.proto = proto; } - public boolean process(long socket) { + public SocketState event(long socket, boolean error) { + Http11AprProcessor result = connections.get(socket); + SocketState state = SocketState.CLOSED; + if (result != null) { + boolean recycle = error; + // Call the appropriate event + try { + state = result.event(error); + } catch (java.net.SocketException e) { + // SocketExceptions are normal + Http11AprProtocol.log.debug + (sm.getString + ("http11protocol.proto.socketexception.debug"), e); + } catch (java.io.IOException e) { + // IOExceptions are normal + Http11AprProtocol.log.debug + (sm.getString + ("http11protocol.proto.ioexception.debug"), e); + } + // Future developers: if you discover any other + // rare-but-nonfatal exceptions, catch them here, and log as + // above. + catch (Throwable e) { + // any other exception or error is odd. Here we log it + // with "ERROR" level, so it will show up even on + // less-than-verbose logs. + Http11AprProtocol.log.error + (sm.getString("http11protocol.proto.error"), e); + } finally { + if (state != SocketState.LONG) { + connections.remove(socket); + recycledProcessors.push(result); + } + } + } + return state; + } + + public SocketState process(long socket) { Http11AprProcessor processor = null; try { processor = (Http11AprProcessor) localProcessor.get(); if (processor == null) { + synchronized (recycledProcessors) { + if (!recycledProcessors.isEmpty()) { + processor = recycledProcessors.pop(); + localProcessor.set(processor); + } + } + } + if (processor == null) { processor = new Http11AprProcessor(proto.maxHttpHeaderSize, proto.ep); processor.setAdapter(proto.adapter); @@ -647,7 +701,15 @@ ((ActionHook) processor).action(ActionCode.ACTION_START, null); } - return processor.process(socket); + SocketState state = processor.process(socket); + if (state == SocketState.LONG) { + // Associate the connection with the processor. The next request + // processed by this thread will use either a new or a recycled + // processor. + connections.put(socket, processor); + localProcessor.set(null); + } + return state; } catch(java.net.SocketException e) { // SocketExceptions are normal @@ -669,15 +731,8 @@ // less-than-verbose logs. Http11AprProtocol.log.error (sm.getString("http11protocol.proto.error"), e); - } finally { - // if(proto.adapter != null) proto.adapter.recycle(); - // processor.recycle(); - - if (processor instanceof ActionHook) { - ((ActionHook) processor).action(ActionCode.ACTION_STOP, null); - } } - return false; + return SocketState.CLOSED; } } Modified: tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalAprInputBuffer.java URL: http://svn.apache.org/viewcvs/tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalAprInputBuffer.java?rev=407241&r1=407240&r2=407241&view=diff ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalAprInputBuffer.java (original) +++ tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalAprInputBuffer.java Wed May 17 05:55:39 2006 @@ -329,8 +329,7 @@ * consumed. This method only resets all the pointers so that we are ready * to parse the next HTTP request. */ - public void nextRequest() - throws IOException { + public void nextRequest() { // Recycle Request object request.recycle(); Modified: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java URL: http://svn.apache.org/viewcvs/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java?rev=407241&r1=407240&r2=407241&view=diff ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java (original) +++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java Wed May 17 05:55:39 2006 @@ -300,6 +300,14 @@ /** + * Allow comet request handling. + */ + protected boolean useComet = true; + public void setUseComet(boolean useComet) { this.useComet = useComet; } + public boolean getUseComet() { return useComet; } + + + /** * Acceptor thread count. */ protected int acceptorThreadCount = 0; @@ -335,6 +343,17 @@ /** + * The socket poller used for Comet support. + */ + protected Poller[] cometPollers = null; + protected int cometPollerRoundRobin = 0; + public Poller getCometPoller() { + cometPollerRoundRobin = (cometPollerRoundRobin + 1) % cometPollers.length; + return cometPollers[cometPollerRoundRobin]; + } + + + /** * The static file sender. */ protected Sendfile[] sendfiles = null; @@ -561,11 +580,8 @@ addressStr = address.getHostAddress(); } int family = Socket.APR_INET; - if (Library.APR_HAVE_IPV6) { - if (addressStr == null) - family = Socket.APR_UNSPEC; - else if (addressStr.indexOf(':') >= 0) - family = Socket.APR_UNSPEC; + if (Library.APR_HAVE_IPV6 && (addressStr == null || addressStr.indexOf(':') >= 0)) { + family = Socket.APR_UNSPEC; } long inetAddress = Address.info(addressStr, family, port, 0, rootPool); @@ -712,7 +728,7 @@ // Start poller threads pollers = new Poller[pollerThreadCount]; for (int i = 0; i < pollerThreadCount; i++) { - pollers[i] = new Poller(); + pollers[i] = new Poller(false); pollers[i].init(); Thread pollerThread = new Thread(pollers[i], getName() + "-Poller-" + i); pollerThread.setPriority(threadPriority); @@ -720,6 +736,17 @@ pollerThread.start(); } + // Start comet poller threads + cometPollers = new Poller[pollerThreadCount]; + for (int i = 0; i < pollerThreadCount; i++) { + cometPollers[i] = new Poller(true); + cometPollers[i].init(); + Thread pollerThread = new Thread(cometPollers[i], getName() + "-CometPoller-" + i); + pollerThread.setPriority(threadPriority); + pollerThread.setDaemon(true); + pollerThread.start(); + } + // Start sendfile threads if (useSendfile) { sendfiles = new Sendfile[sendfileThreadCount]; @@ -998,6 +1025,26 @@ } + /** + * Process given socket for an event. + */ + protected boolean processSocket(long socket, boolean error) { + try { + if (executor == null) { + getWorkerThread().assign(socket, error); + } else { + executor.execute(new SocketEventProcessor(socket, error)); + } + } catch (Throwable t) { + // This means we got an OOM or similar creating a thread, or that + // the pool and its queue are full + log.error(sm.getString("endpoint.process.fail"), t); + return false; + } + return true; + } + + // --------------------------------------------------- Acceptor Inner Class @@ -1060,10 +1107,18 @@ protected long[] addS; protected int addCount = 0; + protected long[] removeS; + protected int removeCount = 0; + protected boolean comet = true; + protected int keepAliveCount = 0; public int getKeepAliveCount() { return keepAliveCount; } + public Poller(boolean comet) { + this.comet = comet; + } + /** * Create the poller. With some versions of APR, the maximum poller size will * be 62 (reocmpiling APR is necessary to remove this limitation). @@ -1071,19 +1126,29 @@ protected void init() { pool = Pool.create(serverSockPool); int size = pollerSize / pollerThreadCount; - serverPollset = allocatePoller(size, pool, soTimeout); + int timeout = soTimeout; + if (comet) { + // FIXME: Find an appropriate timeout value, for now, "longer than usual" + // semms appropriate + timeout = soTimeout * 20; + } + serverPollset = allocatePoller(size, pool, timeout); if (serverPollset == 0 && size > 1024) { size = 1024; - serverPollset = allocatePoller(size, pool, soTimeout); + serverPollset = allocatePoller(size, pool, timeout); } if (serverPollset == 0) { size = 62; - serverPollset = allocatePoller(size, pool, soTimeout); + serverPollset = allocatePoller(size, pool, timeout); } desc = new long[size * 2]; keepAliveCount = 0; addS = new long[size]; addCount = 0; + if (comet) { + removeS = new long[size]; + } + removeCount = 0; } /** @@ -1092,18 +1157,32 @@ protected void destroy() { // Close all sockets in the add queue for (int i = 0; i < addCount; i++) { + if (comet) { + processSocket(addS[i], true); + } Socket.destroy(addS[i]); } + // Close all sockets in the remove queue + for (int i = 0; i < removeCount; i++) { + if (comet) { + processSocket(removeS[i], true); + } + Socket.destroy(removeS[i]); + } // Close all sockets still in the poller int rv = Poll.pollset(serverPollset, desc); if (rv > 0) { for (int n = 0; n < rv; n++) { + if (comet) { + processSocket(desc[n*2+1], true); + } Socket.destroy(desc[n*2+1]); } } Pool.destroy(pool); keepAliveCount = 0; addCount = 0; + removeCount = 0; } /** @@ -1120,6 +1199,9 @@ // at most for pollTime before being polled if (addCount >= addS.length) { // Can't do anything: close the socket right away + if (comet) { + processSocket(socket, true); + } Socket.destroy(socket); return; } @@ -1130,6 +1212,30 @@ } /** + * Remove specified socket and associated pool from the poller. The socket will + * be added to a temporary array, and polled first after a maximum amount + * of time equal to pollTime (in most cases, latency will be much lower, + * however). Note that this is automatic, except if the poller is used for + * comet. + * + * @param socket to remove from the poller + */ + public void remove(long socket) { + synchronized (this) { + // Add socket to the list. Newly added sockets will wait + // at most for pollTime before being polled + if (removeCount >= removeS.length) { + // Normally, it cannot happen ... + Socket.destroy(socket); + return; + } + removeS[removeCount] = socket; + removeCount++; + this.notify(); + } + } + + /** * The background thread that listens for incoming TCP/IP connections and * hands them off to an appropriate processor. */ @@ -1171,23 +1277,41 @@ keepAliveCount++; } else { // Can't do anything: close the socket right away + if (comet) { + processSocket(addS[i], true); + } Socket.destroy(addS[i]); } } addCount = 0; } } + // Remove sockets which are waiting to the poller + if (removeCount > 0) { + synchronized (this) { + for (int i = 0; i < removeCount; i++) { + int rv = Poll.remove(serverPollset, removeS[i]); + } + removeCount = 0; + } + } + maintainTime += pollTime; // Pool for the specified interval - int rv = Poll.poll(serverPollset, pollTime, desc, true); + int rv = Poll.poll(serverPollset, pollTime, desc, !comet); if (rv > 0) { keepAliveCount -= rv; for (int n = 0; n < rv; n++) { // Check for failed sockets and hand this socket off to a worker if (((desc[n*2] & Poll.APR_POLLHUP) == Poll.APR_POLLHUP) || ((desc[n*2] & Poll.APR_POLLERR) == Poll.APR_POLLERR) + || (comet && (!processSocket(desc[n*2+1], false))) || (!processSocket(desc[n*2+1]))) { // Close socket and clear pool + if (comet) { + processSocket(desc[n*2+1], true); + Poll.remove(serverPollset, desc[n*2+1]); + } Socket.destroy(desc[n*2+1]); continue; } @@ -1215,6 +1339,11 @@ keepAliveCount -= rv; for (int n = 0; n < rv; n++) { // Close socket and clear pool + if (comet) { + // FIXME: should really close in case of timeout ? + // FIXME: maybe comet should use an extended timeout + processSocket(desc[n], true); + } Socket.destroy(desc[n]); } } @@ -1242,6 +1371,8 @@ protected Thread thread = null; protected boolean available = false; protected long socket = 0; + protected boolean event = false; + protected boolean error = false; /** @@ -1265,6 +1396,28 @@ // Store the newly available Socket and notify our thread this.socket = socket; + event = false; + error = false; + available = true; + notifyAll(); + + } + + + protected synchronized void assign(long socket, boolean error) { + + // Wait for the Processor to get the previous Socket + while (available) { + try { + wait(); + } catch (InterruptedException e) { + } + } + + // Store the newly available Socket and notify our thread + this.socket = socket; + event = true; + this.error = error; available = true; notifyAll(); @@ -1310,7 +1463,11 @@ continue; // Process the request from this socket - if (!handler.process(socket)) { + if ((event) && (handler.event(socket, error) == Handler.SocketState.CLOSED)) { + // Close socket and pool + Socket.destroy(socket); + socket = 0; + } else if (handler.process(socket) == Handler.SocketState.CLOSED) { // Close socket and pool Socket.destroy(socket); socket = 0; @@ -1622,7 +1779,11 @@ * thread local fields. */ public interface Handler { - public boolean process(long socket); + public enum SocketState { + OPEN, CLOSED, LONG + } + public SocketState process(long socket); + public SocketState event(long socket, boolean error); } @@ -1700,7 +1861,38 @@ public void run() { // Process the request from this socket - if (!handler.process(socket)) { + if (handler.process(socket) == Handler.SocketState.CLOSED) { + // Close socket and pool + Socket.destroy(socket); + socket = 0; + } + + } + + } + + + // --------------------------------------- SocketEventProcessor Inner Class + + + /** + * This class is the equivalent of the Worker, but will simply use in an + * external Executor thread pool. + */ + protected class SocketEventProcessor implements Runnable { + + protected long socket = 0; + protected boolean error = false; + + public SocketEventProcessor(long socket, boolean error) { + this.socket = socket; + this.error = error; + } + + public void run() { + + // Process the request from this socket + if (handler.event(socket, error) == Handler.SocketState.CLOSED) { // Close socket and pool Socket.destroy(socket); socket = 0; --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]