Author: remm Date: Tue May 29 04:24:13 2007 New Revision: 542491 URL: http://svn.apache.org/viewvc?view=rev&rev=542491 Log: - My own view on the Comet API extensions. The main differences are: * remove robustness checks (I believe they are not useful, as they have a performance cost, only handle the most evident "issues", and even for those evident "issues" are most likely arbitrary) * very simple calls and no additional data structures exposed to the end user * merge write notifications mode with non blocking; non blocking is never configured in an explicit way * pick up the SocketStatus additions * most likely backwards compatible with Comet from Tomcat 6.0 - No actual connector updates yet (for APR, the main change is to do timeout handling using Java, which is overdue).
Modified: tomcat/sandbox/comet/java/org/apache/catalina/CometEvent.java tomcat/sandbox/comet/java/org/apache/catalina/connector/CometEventImpl.java tomcat/sandbox/comet/java/org/apache/catalina/connector/CoyoteAdapter.java tomcat/sandbox/comet/java/org/apache/catalina/connector/OutputBuffer.java tomcat/sandbox/comet/java/org/apache/catalina/connector/Request.java tomcat/sandbox/comet/java/org/apache/catalina/connector/Response.java tomcat/sandbox/comet/java/org/apache/coyote/ActionCode.java tomcat/sandbox/comet/java/org/apache/coyote/Response.java tomcat/sandbox/comet/java/org/apache/tomcat/util/net/AprEndpoint.java tomcat/sandbox/comet/java/org/apache/tomcat/util/net/NioEndpoint.java tomcat/sandbox/comet/java/org/apache/tomcat/util/net/SocketStatus.java Modified: tomcat/sandbox/comet/java/org/apache/catalina/CometEvent.java URL: http://svn.apache.org/viewvc/tomcat/sandbox/comet/java/org/apache/catalina/CometEvent.java?view=diff&rev=542491&r1=542490&r2=542491 ============================================================================== --- tomcat/sandbox/comet/java/org/apache/catalina/CometEvent.java (original) +++ tomcat/sandbox/comet/java/org/apache/catalina/CometEvent.java Tue May 29 04:24:13 2007 @@ -20,7 +20,6 @@ import java.io.IOException; -import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -145,99 +144,46 @@ * @param timeout The timeout in milliseconds for this connection, must be a positive value, larger than 0 * @throws IOException An IOException may be thrown to indicate an IO error, * or that the EOF has been reached on the connection - * @throws ServletException An exception has occurred, as specified by the root - * cause - * @throws UnsupportedOperationException if per connection timeout is not supported, either at all or at this phase - * of the invocation. */ public void setTimeout(int timeout) - throws IOException, ServletException, UnsupportedOperationException; - - + throws IOException; /** - * COMET_NON_BLOCKING - * Option bit set for allowing non blocking IO - * when reading from the request or writing to the response - * COMET_NO_IO - * Option bit set to not register for any IO events - * Connections can be reregistered for IO events using the - * @see #configure(int) - */ - public enum CometConfiguration {COMET_NON_BLOCKING,COMET_NO_IO}; - - /** - * Configures the connection for desired IO options. - * By default a Comet connection is configured for <br/> - * a) Blocking IO - standard servlet usage<br/> - * b) Register for READ events when data arrives<br/> - * Tomcat Comet allows you to configure for additional options:<br/> - * the <code>COMET_NON_BLOCKING</code> bit signals whether writing and reading from the request - * or writing to the response will be non blocking.<br/> - * the <code>COMET_NO_IO</code> bit signals the container that you are not interested in - * receiving any IO events from the container. - * @param cometOptions int - the option bit set, see #COMET_NON_BLOCKING and #COMET_NO_IO - * @throws IOException - - * @throws IllegalStateException - if this method is invoked outside of the BEGIN event - */ - public void configure(CometConfiguration... options) - throws IOException, IllegalStateException; - - /** - * Returns the configuration for this Comet connection - * @return CometConfiguration[] - * @see #configure(CometConfiguration...) - */ - public CometConfiguration[] getConfiguration(); - - /** - * OP_CALLBACK - receive a CALLBACK event from the container - * OP_READ - receive a READ event when the connection has data to be read - * OP_WRITE - receive a WRITE event when the connection is able to receive data to be written - * @see #register(CometOperations) - */ - public enum CometOperation {OP_CALLBACK, OP_READ, OP_WRITE}; - - /** - * Registers the Comet connection with the container for IO notifications. - * These could be notifications - * @param operations - * @throws IOException - * @throws IllegalStateException - if you are trying to register with a socket that already is registered - * or if the operation you are trying to register is invalid. - */ - public void register(CometOperation... operations) - throws IOException, IllegalStateException; - - /** - * Unregisters Comet operations for this CometConnection - * @param operations CometOperation[] - * @throws IOException - * @throws IllegalStateException + * Returns true if write notifications are disabled, or is they are enabled and data may + * be written to the connection (the flag becomes false when the client is unable to accept + * data fast enough). When the flag becomes + * + * @return boolean true if you can write to the response */ - public void unregister(CometOperation... operations) - throws IOException, IllegalStateException; + public boolean isWriteable(); /** - * Returns what the current IO notifications that the Comet - * connection is registered for. - * @return CometOperations[] - * @see #register(CometOperations...) + * Returns true if the Comet connection is blocking or non blocking and data is available to be read. + * If attempting to read in non blocking mode and this flag is false, an IO exception will occur. + * + * @see javax.servlet.ServletRequest#getInputStream()#available()>0 + * @return boolean */ - public CometOperation[] getRegisteredOps(); - + public boolean isReadable(); + /** - * Returns true if the Comet connection is blocking or non blocking and you can write - * without blocking data to the response - * @return boolean - true if you can write to the response + * Configure notifications that will be recieved. This method should be called during the processing of + * the begin event. If configure is not called, the behavior will be the same as if configure(true, false) + * is called. + * + * @param read if true, read events will be sent to the servlet + * @param write if true, the connection will be placed in non blocking mode, and write notifications + * may be requested by using the sendNotify method */ - public boolean isWriteable(); - + public void configure(boolean read, boolean write); + /** - * Returns true if the Comet connection is blocking or non blocking and data is available to be read - * @see javax.servlet.ServletRequest#getInputStream()#available()>0 - * @return boolean + * Send a notify event to the servlet. + * + * @param write with the value true should be called when isWriteable becomes false, to request notification + * when the connection becomes available for writing again; with the value false, a notify event + * will be sent to the servlet */ - public boolean isReadable(); + public void callback(boolean write); } Modified: tomcat/sandbox/comet/java/org/apache/catalina/connector/CometEventImpl.java URL: http://svn.apache.org/viewvc/tomcat/sandbox/comet/java/org/apache/catalina/connector/CometEventImpl.java?view=diff&rev=542491&r1=542490&r2=542491 ============================================================================== --- tomcat/sandbox/comet/java/org/apache/catalina/connector/CometEventImpl.java (original) +++ tomcat/sandbox/comet/java/org/apache/catalina/connector/CometEventImpl.java Tue May 29 04:24:13 2007 @@ -69,23 +69,6 @@ */ protected EventSubType eventSubType = null; - /** - * Current set of operations - */ - protected HashSet<CometOperation> cometOperations = new HashSet<CometOperation>(3); - - /** - * Current set of configurations - */ - protected HashSet<CometConfiguration> cometConfigurations = new HashSet<CometConfiguration>(3); - - protected WorkerThreadCheck threadCheck = new WorkerThreadCheck(); - - private static final Object threadCheckHolder = new Object(); - - protected boolean readable = false; - - protected boolean writeable = false; // --------------------------------------------------------- Public Methods /** @@ -128,77 +111,41 @@ return response.getResponse(); } - public void setTimeout(int timeout) throws IOException, ServletException,UnsupportedOperationException { - //this check should get removed as soon as connection timeout is implemented everywhere. - if (request.getAttribute("org.apache.tomcat.comet.timeout.support") == Boolean.TRUE) { - request.action(ActionCode.ACTION_COMET_TIMEOUT,new Integer(timeout)); - } else { - throw new UnsupportedOperationException(); - } + public void setTimeout(int timeout) + throws IOException { + request.setTimeout(timeout); } public boolean isReadable() { - return readable; + return request.isReadable(); } - - public void setReadable(boolean r) { - this.readable = r; - } - + public boolean isWriteable() { - return writeable; - } - - public void setWriteable(boolean w) { - this.writeable = w; + return response.isWriteable(); } - public void configure(CometEvent.CometConfiguration... options) - throws IOException, IllegalStateException { - checkWorkerThread(); - synchronized (cometConfigurations) { - cometConfigurations.clear(); - for (CometEvent.CometConfiguration cc : options) { - cometConfigurations.add(cc); - } - request.action(ActionCode.ACTION_COMET_CONFIGURE,options); - } + /** + * Configure notifications that will be recieved. + * + * @param read if true, read events will be sent to the servlet + * @param write if true, the connection will be placed in non blocking mode, and write notifications + * may be requested by using the sendNotify method + */ + public void configure(boolean read, boolean write) { + } - public void register(CometEvent.CometOperation... operations) - throws IOException, IllegalStateException { - synchronized (cometOperations) { - //add it to the registered set - for (CometEvent.CometOperation co : operations) { - if (!cometOperations.contains(co)) { - cometOperations.add(co); - request.action(ActionCode.ACTION_COMET_REGISTER, co); - } - } - } + /** + * Send a notify event to the servlet. + * + * @param write with the value true should be called when isWriteable becomes false, to request notification + * when the connection becomes available for writing again; with the value false, a notify event + * will be sent to the servlet + */ + public void callback(boolean write) { + } - public void unregister(CometOperation... operations) - throws IOException, IllegalStateException { - synchronized (cometOperations) { - //remove from the registered set - for (CometEvent.CometOperation co : operations) { - if (cometOperations.contains(co)) { - cometOperations.remove(co); - request.action(ActionCode.ACTION_COMET_UNREGISTER, co); - } - } - } - } - - public CometConfiguration[] getConfiguration() { - return (CometConfiguration[])cometConfigurations.toArray(new CometConfiguration[0]); - } - - public CometOperation[] getRegisteredOps() { - return (CometOperation[])cometOperations.toArray(new CometOperation[0]); - } - public String toString() { StringBuffer buf = new StringBuffer("CometEventImpl["); buf.append(super.toString()); @@ -207,25 +154,6 @@ buf.append(" SubType:"); buf.append(getEventSubType()); return buf.toString(); - } - - protected void setWorkerThread() { - threadCheck.set(threadCheckHolder); - } - - protected void unsetWorkerThread() { - threadCheck.set(null); - } - - protected void checkWorkerThread() throws IllegalStateException { - //throw exception if not on worker thread - if ( !(threadCheck.get() == threadCheckHolder) ) - throw new IllegalStateException("The operation can only be performed when invoked by a Tomcat worker thread."); - } - - //inner class used to keep track if the current thread is a worker thread. - private static class WorkerThreadCheck extends ThreadLocal { - } } Modified: tomcat/sandbox/comet/java/org/apache/catalina/connector/CoyoteAdapter.java URL: http://svn.apache.org/viewvc/tomcat/sandbox/comet/java/org/apache/catalina/connector/CoyoteAdapter.java?view=diff&rev=542491&r1=542490&r2=542491 ============================================================================== --- tomcat/sandbox/comet/java/org/apache/catalina/connector/CoyoteAdapter.java (original) +++ tomcat/sandbox/comet/java/org/apache/catalina/connector/CoyoteAdapter.java Tue May 29 04:24:13 2007 @@ -119,11 +119,8 @@ boolean error = false; boolean read = false; - CometEvent event = request.getEvent(); try { - if ( event!=null && (event instanceof CometEventImpl)) - ((CometEventImpl)event).setWorkerThread(); - if (status == SocketStatus.OPEN) { + if (status == SocketStatus.OPEN_READ) { if (response.isClosed()) { // The event has been closed asynchronously, so call end instead of // read to cleanup the pipeline @@ -149,6 +146,26 @@ request.getEvent().setEventSubType(null); } } + } else if (status == SocketStatus.OPEN_WRITE) { + if (response.isClosed()) { + // The event has been closed asynchronously, so call end instead of + // read to cleanup the pipeline + request.getEvent().setEventType(CometEvent.EventType.END); + request.getEvent().setEventSubType(null); + } else { + request.getEvent().setEventType(CometEvent.EventType.WRITE); + request.getEvent().setEventSubType(null); + } + } else if (status == SocketStatus.OPEN_CALLBACK) { + if (response.isClosed()) { + // The event has been closed asynchronously, so call end instead of + // read to cleanup the pipeline + request.getEvent().setEventType(CometEvent.EventType.END); + request.getEvent().setEventSubType(null); + } else { + request.getEvent().setEventType(CometEvent.EventType.CALLBACK); + request.getEvent().setEventSubType(null); + } } else if (status == SocketStatus.DISCONNECT) { request.getEvent().setEventType(CometEvent.EventType.ERROR); request.getEvent().setEventSubType(CometEvent.EventSubType.CLIENT_DISCONNECT); @@ -187,7 +204,7 @@ } if (response.isClosed() || !request.isComet()) { res.action(ActionCode.ACTION_COMET_END, null); - } else if (!error && read && request.getAvailable()) { + } else if (!error && read && request.isReadable()) { // If this was a read and not all bytes have been read, or if no data // was read from the connector, then it is an error error = true; @@ -201,9 +218,6 @@ error = true; return false; } finally { - if ( event!=null && (event instanceof CometEventImpl)) - ((CometEventImpl)event).unsetWorkerThread(); - req.getRequestProcessor().setWorkerThreadName(null); // Recycle the wrapper request and response if (error || response.isClosed() || !request.isComet()) { @@ -256,25 +270,21 @@ } boolean comet = false; - CometEvent event = null; try { // Parse and set Catalina and configuration specific // request parameters req.getRequestProcessor().setWorkerThreadName(Thread.currentThread().getName()); if (postParseRequest(req, request, res, response)) { - event = request.getEvent(); - if ( event!=null && (event instanceof CometEventImpl)) - ((CometEventImpl)event).setWorkerThread(); // Calling the container connector.getContainer().getPipeline().getFirst().invoke(request, response); if (request.isComet()) { if (!response.isClosed() && !response.isError()) { - if (request.getAvailable()) { + if (request.isReadable()) { // Invoke a read event right away if there are available bytes - if (event(req, res, SocketStatus.OPEN)) { + if (event(req, res, SocketStatus.OPEN_READ)) { comet = true; res.action(ActionCode.ACTION_COMET_BEGIN, null); } @@ -301,9 +311,6 @@ } catch (Throwable t) { log.error(sm.getString("coyoteAdapter.service"), t); } finally { - if ( event!=null && (event instanceof CometEventImpl)) - ((CometEventImpl)event).unsetWorkerThread(); - req.getRequestProcessor().setWorkerThreadName(null); // Recycle the wrapper request and response if (!comet) { Modified: tomcat/sandbox/comet/java/org/apache/catalina/connector/OutputBuffer.java URL: http://svn.apache.org/viewvc/tomcat/sandbox/comet/java/org/apache/catalina/connector/OutputBuffer.java?view=diff&rev=542491&r1=542490&r2=542491 ============================================================================== --- tomcat/sandbox/comet/java/org/apache/catalina/connector/OutputBuffer.java (original) +++ tomcat/sandbox/comet/java/org/apache/catalina/connector/OutputBuffer.java Tue May 29 04:24:13 2007 @@ -323,6 +323,14 @@ } + + /** + * Return the amount of bytes written by the lower layer. + */ + protected int lastWrite() { + return coyoteResponse.getLastWrite(); + } + // ------------------------------------------------- Bytes Handling Methods Modified: tomcat/sandbox/comet/java/org/apache/catalina/connector/Request.java URL: http://svn.apache.org/viewvc/tomcat/sandbox/comet/java/org/apache/catalina/connector/Request.java?view=diff&rev=542491&r1=542490&r2=542491 ============================================================================== --- tomcat/sandbox/comet/java/org/apache/catalina/connector/Request.java (original) +++ tomcat/sandbox/comet/java/org/apache/catalina/connector/Request.java Tue May 29 04:24:13 2007 @@ -2252,16 +2252,45 @@ /** * Return true if bytes are available. */ - public boolean getAvailable() { + public boolean isReadable() { return (inputBuffer.available() > 0); } - // ------------------------------------------------------ Protected Methods + /** + * Set connection timeout. + */ + public void setTimeout(int timeout) { + coyoteRequest.action(ActionCode.ACTION_COMET_TIMEOUT, new Integer(timeout)); + } + + + /** + * Configure notifications that will be recieved. + * + * @param read if true, read events will be sent to the servlet + * @param write if true, the connection will be placed in non blocking mode, and write notifications + * may be requested by using the sendNotify method + */ + public void configure(boolean read, boolean write) { + coyoteRequest.action(ActionCode.ACTION_COMET_READ_NOTIFICATIONS, Boolean.valueOf(read)); + coyoteRequest.action(ActionCode.ACTION_COMET_WRITE_NOTIFICATIONS, Boolean.valueOf(write)); + } - protected void action(ActionCode actionCode, Object param) { - coyoteRequest.action(actionCode,param); + /** + * Send a notify event to the servlet. + * + * @param write with the value true should be called when isWriteable becomes false, to request notification + * when the connection becomes available for writing again; with the value false, a notify event + * will be sent to the servlet + */ + public void callback(boolean write) { + coyoteRequest.action(ActionCode.ACTION_COMET_CALLBACK, Boolean.valueOf(write)); } + + + // ------------------------------------------------------ Protected Methods + protected Session doGetSession(boolean create) { Modified: tomcat/sandbox/comet/java/org/apache/catalina/connector/Response.java URL: http://svn.apache.org/viewvc/tomcat/sandbox/comet/java/org/apache/catalina/connector/Response.java?view=diff&rev=542491&r1=542490&r2=542491 ============================================================================== --- tomcat/sandbox/comet/java/org/apache/catalina/connector/Response.java (original) +++ tomcat/sandbox/comet/java/org/apache/catalina/connector/Response.java Tue May 29 04:24:13 2007 @@ -529,6 +529,14 @@ } + /** + * Return true if bytes are available. + */ + public boolean isWriteable() { + return (outputBuffer.lastWrite() > 0); + } + + // ------------------------------------------------ ServletResponse Methods Modified: tomcat/sandbox/comet/java/org/apache/coyote/ActionCode.java URL: http://svn.apache.org/viewvc/tomcat/sandbox/comet/java/org/apache/coyote/ActionCode.java?view=diff&rev=542491&r1=542490&r2=542491 ============================================================================== --- tomcat/sandbox/comet/java/org/apache/coyote/ActionCode.java (original) +++ tomcat/sandbox/comet/java/org/apache/coyote/ActionCode.java Tue May 29 04:24:13 2007 @@ -159,19 +159,18 @@ /** * Configure a Comet connection */ - public static final ActionCode ACTION_COMET_CONFIGURE = new ActionCode(25); + public static final ActionCode ACTION_COMET_CALLBACK = new ActionCode(25); /** - * Register notifications for events for a certain comet connection + * Register notifications for read events */ - public static final ActionCode ACTION_COMET_REGISTER = new ActionCode(26); + public static final ActionCode ACTION_COMET_READ_NOTIFICATIONS = new ActionCode(26); /** - * Unregister for notifications for a comet connection + * Register notifications for write events */ - public static final ActionCode ACTION_COMET_UNREGISTER = new ActionCode(27); + public static final ActionCode ACTION_COMET_WRITE_NOTIFICATIONS = new ActionCode(27); - // ----------------------------------------------------------- Constructors int code; Modified: tomcat/sandbox/comet/java/org/apache/coyote/Response.java URL: http://svn.apache.org/viewvc/tomcat/sandbox/comet/java/org/apache/coyote/Response.java?view=diff&rev=542491&r1=542490&r2=542491 ============================================================================== --- tomcat/sandbox/comet/java/org/apache/coyote/Response.java (original) +++ tomcat/sandbox/comet/java/org/apache/coyote/Response.java Tue May 29 04:24:13 2007 @@ -124,6 +124,8 @@ protected Request req; + protected int lastWrite = 1; + // ------------------------------------------------------------- Properties public Request getRequest() { @@ -188,6 +190,16 @@ // -------------------- State -------------------- + public int getLastWrite() { + return lastWrite; + } + + + public void setLastWrite(int lastWrite) { + this.lastWrite = lastWrite; + } + + public int getStatus() { return status; } @@ -579,6 +591,7 @@ headers.clear(); // update counters + lastWrite = 1; bytesWritten=0; } Modified: tomcat/sandbox/comet/java/org/apache/tomcat/util/net/AprEndpoint.java URL: http://svn.apache.org/viewvc/tomcat/sandbox/comet/java/org/apache/tomcat/util/net/AprEndpoint.java?view=diff&rev=542491&r1=542490&r2=542491 ============================================================================== --- tomcat/sandbox/comet/java/org/apache/tomcat/util/net/AprEndpoint.java (original) +++ tomcat/sandbox/comet/java/org/apache/tomcat/util/net/AprEndpoint.java Tue May 29 04:24:13 2007 @@ -1301,7 +1301,7 @@ // Check for failed sockets and hand this socket off to a worker if (((desc[n*2] & Poll.APR_POLLHUP) == Poll.APR_POLLHUP) || ((desc[n*2] & Poll.APR_POLLERR) == Poll.APR_POLLERR) - || (comet && (!processSocket(desc[n*2+1], SocketStatus.OPEN))) + || (comet && (!processSocket(desc[n*2+1], SocketStatus.OPEN_READ))) || (!comet && (!processSocket(desc[n*2+1])))) { // Close socket and clear pool if (comet) { Modified: tomcat/sandbox/comet/java/org/apache/tomcat/util/net/NioEndpoint.java URL: http://svn.apache.org/viewvc/tomcat/sandbox/comet/java/org/apache/tomcat/util/net/NioEndpoint.java?view=diff&rev=542491&r1=542490&r2=542491 ============================================================================== --- tomcat/sandbox/comet/java/org/apache/tomcat/util/net/NioEndpoint.java (original) +++ tomcat/sandbox/comet/java/org/apache/tomcat/util/net/NioEndpoint.java Tue May 29 04:24:13 2007 @@ -1492,7 +1492,7 @@ //check if thread is available if ( isWorkerAvailable() ) { unreg(sk, attachment, sk.readyOps()); - if (!processSocket(channel, SocketStatus.OPEN)) + if (!processSocket(channel, SocketStatus.OPEN_READ)) processSocket(channel, SocketStatus.DISCONNECT); } else { result = false; Modified: tomcat/sandbox/comet/java/org/apache/tomcat/util/net/SocketStatus.java URL: http://svn.apache.org/viewvc/tomcat/sandbox/comet/java/org/apache/tomcat/util/net/SocketStatus.java?view=diff&rev=542491&r1=542490&r2=542491 ============================================================================== --- tomcat/sandbox/comet/java/org/apache/tomcat/util/net/SocketStatus.java (original) +++ tomcat/sandbox/comet/java/org/apache/tomcat/util/net/SocketStatus.java Tue May 29 04:24:13 2007 @@ -23,5 +23,5 @@ * @author remm */ public enum SocketStatus { - OPEN, STOP, TIMEOUT, DISCONNECT, ERROR + OPEN_READ, OPEN_WRITE, OPEN_CALLBACK, STOP, TIMEOUT, DISCONNECT, ERROR } --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]