Author: fhanik Date: Thu May 31 12:32:33 2007 New Revision: 543226 URL: http://svn.apache.org/viewvc?view=rev&rev=543226 Log: 1. Timeouts are now per connection, not using fixed timeouts anywhere. by default the connection gets the timeout defined in server.xml 2. Implemented all Comet operations, including the ability to have none 3. Implemented CometEvent.isReadable and isWriteable isAvailable - means data is available to the servlet isReadable - means there is data from the socket also checks the socket, by doing a read, in a non blocking fashion to verify this to be true isWriteable - the last write attempted on this socket was 0, hence we are probably blocking 4. simplified CometEvent.register/unregister, they are now just one call and no syncs 5. After each event, the connection is registered with the same operations it had before 6. CoyoteAdapter respects when the servlet doesn't want to be notified of the READ event, hence it doesn't invoke it automatically 7. Let me know if MutableBoolean and MutableInteger should be elsewhere(in terms of package), they are used since ActionHook doesn't have a return value and also valuable in the output buffers since SSL writing is two steps, one through the engine and the other to the socket I'm pretty happy with how isReadable,isWriteable works, they are completly non blocking and very accurate True non blocking in the buffers and filters seems like a major surgery, still holding off on that. Need to fix the NioBlockingSelector as it is almost impossible to make the poller interest declaration thread safe
Added: tomcat/trunk/java/org/apache/tomcat/util/MutableBoolean.java tomcat/trunk/java/org/apache/tomcat/util/MutableInteger.java Modified: tomcat/trunk/java/org/apache/catalina/connector/CometEventImpl.java tomcat/trunk/java/org/apache/catalina/connector/CoyoteAdapter.java tomcat/trunk/java/org/apache/catalina/connector/OutputBuffer.java tomcat/trunk/java/org/apache/catalina/connector/Request.java tomcat/trunk/java/org/apache/catalina/connector/Response.java tomcat/trunk/java/org/apache/coyote/ActionCode.java tomcat/trunk/java/org/apache/coyote/Response.java tomcat/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java tomcat/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java tomcat/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java tomcat/trunk/java/org/apache/tomcat/util/net/NioChannel.java tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java tomcat/trunk/java/org/apache/tomcat/util/net/NioSelectorPool.java tomcat/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java Modified: tomcat/trunk/java/org/apache/catalina/connector/CometEventImpl.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/connector/CometEventImpl.java?view=diff&rev=543226&r1=543225&r2=543226 ============================================================================== --- tomcat/trunk/java/org/apache/catalina/connector/CometEventImpl.java (original) +++ tomcat/trunk/java/org/apache/catalina/connector/CometEventImpl.java Thu May 31 12:32:33 2007 @@ -29,6 +29,7 @@ import org.apache.coyote.ActionCode; import org.apache.tomcat.util.net.PollerInterest; import java.util.Arrays; +import org.apache.tomcat.util.MutableBoolean; public class CometEventImpl implements CometEvent { @@ -142,12 +143,16 @@ } public boolean isReadable() { - return request.isReadable(); + return request.isAvailable() || request.isReadable(); } public boolean isWriteable() { return response.isWriteable(); } + public boolean hasOp(CometEvent.CometOperation op) { + return cometOperations.contains(op); + } + public void configure(CometEvent.CometConfiguration... options) throws IOException, IllegalStateException { checkWorkerThread(); @@ -169,7 +174,7 @@ throws IOException, IllegalStateException { //remove from the registered set cometOperations.removeAll(Arrays.asList(operations)); - request.action(ActionCode.ACTION_COMET_UNREGISTER, translate(cometOperations.toArray(new CometOperation[0]))); + request.action(ActionCode.ACTION_COMET_REGISTER, translate(cometOperations.toArray(new CometOperation[0]))); } public CometConfiguration[] getConfiguration() { Modified: tomcat/trunk/java/org/apache/catalina/connector/CoyoteAdapter.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/connector/CoyoteAdapter.java?view=diff&rev=543226&r1=543225&r2=543226 ============================================================================== --- tomcat/trunk/java/org/apache/catalina/connector/CoyoteAdapter.java (original) +++ tomcat/trunk/java/org/apache/catalina/connector/CoyoteAdapter.java Thu May 31 12:32:33 2007 @@ -227,7 +227,7 @@ } if (response.isClosed() || !request.isComet()) { res.action(ActionCode.ACTION_COMET_END, null); - } else if (!error && read && request.isReadable()) { + } else if (!error && read && request.isAvailable()) { // If this was a read and not all bytes have been read, or if no data // was read from the connector, then it is an error error = true; @@ -312,7 +312,7 @@ if (request.isComet()) { if (!response.isClosed() && !response.isError()) { - if (request.isReadable()) { + if (request.isAvailable() && request.hasOp(CometEvent.CometOperation.OP_READ)) { // Invoke a read event right away if there are available bytes if (event(req, res, SocketStatus.OPEN_READ)) { comet = true; Modified: tomcat/trunk/java/org/apache/catalina/connector/OutputBuffer.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/connector/OutputBuffer.java?view=diff&rev=543226&r1=543225&r2=543226 ============================================================================== --- tomcat/trunk/java/org/apache/catalina/connector/OutputBuffer.java (original) +++ tomcat/trunk/java/org/apache/catalina/connector/OutputBuffer.java Thu May 31 12:32:33 2007 @@ -324,13 +324,6 @@ } - /** - * Return the amount of bytes written by the lower layer. - */ - protected int lastWrite() { - return coyoteResponse.getLastWrite(); - } - // ------------------------------------------------- Bytes Handling Methods Modified: tomcat/trunk/java/org/apache/catalina/connector/Request.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/connector/Request.java?view=diff&rev=543226&r1=543225&r2=543226 ============================================================================== --- tomcat/trunk/java/org/apache/catalina/connector/Request.java (original) +++ tomcat/trunk/java/org/apache/catalina/connector/Request.java Thu May 31 12:32:33 2007 @@ -70,6 +70,8 @@ import org.apache.catalina.util.RequestUtil; import org.apache.catalina.util.StringManager; import org.apache.catalina.util.StringParser; +import org.apache.tomcat.util.MutableBoolean; +import org.apache.catalina.CometEvent; /** @@ -2250,13 +2252,26 @@ /** - * Return true if bytes are available. + * Return true if bytes are available at the servlet layer */ - public boolean isReadable() { + public boolean isAvailable() { return (inputBuffer.available() > 0); } - + /** + * returns true if we read data from the socket + * @return boolean + */ + public boolean isReadable() { + MutableBoolean bool = new MutableBoolean(false); + action(ActionCode.ACTION_COMET_READABLE,bool); + return bool.get(); + } + + public boolean hasOp(CometEvent.CometOperation op) { + if ( !comet || getEvent()==null ) return false; + return event.hasOp(op); + } // ------------------------------------------------------ Protected Methods protected void action(ActionCode actionCode, Object param) { Modified: tomcat/trunk/java/org/apache/catalina/connector/Response.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/connector/Response.java?view=diff&rev=543226&r1=543225&r2=543226 ============================================================================== --- tomcat/trunk/java/org/apache/catalina/connector/Response.java (original) +++ tomcat/trunk/java/org/apache/catalina/connector/Response.java Thu May 31 12:32:33 2007 @@ -52,6 +52,8 @@ import org.apache.tomcat.util.http.MimeHeaders; import org.apache.tomcat.util.http.ServerCookie; import org.apache.tomcat.util.net.URL; +import org.apache.coyote.ActionCode; +import org.apache.tomcat.util.MutableBoolean; /** * Wrapper object for the Coyote response. @@ -533,7 +535,9 @@ * Return true if bytes are available. */ public boolean isWriteable() { - return (outputBuffer.lastWrite() > 0); + MutableBoolean bool = new MutableBoolean(false); + coyoteResponse.action(ActionCode.ACTION_COMET_WRITEABLE,bool); + return bool.get(); } Modified: tomcat/trunk/java/org/apache/coyote/ActionCode.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ActionCode.java?view=diff&rev=543226&r1=543225&r2=543226 ============================================================================== --- tomcat/trunk/java/org/apache/coyote/ActionCode.java (original) +++ tomcat/trunk/java/org/apache/coyote/ActionCode.java Thu May 31 12:32:33 2007 @@ -167,11 +167,14 @@ public static final ActionCode ACTION_COMET_REGISTER = new ActionCode(26); /** - * Unregister for notifications for a comet connection + * Action for getting the readable status */ - public static final ActionCode ACTION_COMET_UNREGISTER = new ActionCode(27); - + public static final ActionCode ACTION_COMET_READABLE = new ActionCode(28); + /** + * Action for getting the writeable status + */ + public static final ActionCode ACTION_COMET_WRITEABLE = new ActionCode(29); // ----------------------------------------------------------- Constructors int code; Modified: tomcat/trunk/java/org/apache/coyote/Response.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/Response.java?view=diff&rev=543226&r1=543225&r2=543226 ============================================================================== --- tomcat/trunk/java/org/apache/coyote/Response.java (original) +++ tomcat/trunk/java/org/apache/coyote/Response.java Thu May 31 12:32:33 2007 @@ -123,8 +123,6 @@ protected String errorURI = null; protected Request req; - - protected int lastWrite = 1; // ------------------------------------------------------------- Properties @@ -190,16 +188,6 @@ // -------------------- State -------------------- - public int getLastWrite() { - return lastWrite; - } - - - public void setLastWrite(int lastWrite) { - this.lastWrite = lastWrite; - } - - public int getStatus() { return status; } @@ -591,7 +579,6 @@ headers.clear(); // update counters - lastWrite = 1; bytesWritten=0; } Modified: tomcat/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java?view=diff&rev=543226&r1=543225&r2=543226 ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java Thu May 31 12:32:33 2007 @@ -54,6 +54,7 @@ import org.apache.tomcat.util.res.StringManager; import org.apache.tomcat.util.net.NioEndpoint.KeyAttachment; import org.apache.tomcat.util.net.PollerInterest; +import org.apache.tomcat.util.MutableBoolean; /** @@ -91,12 +92,12 @@ request = new Request(); int readTimeout = endpoint.getSoTimeout(); - inputBuffer = new InternalNioInputBuffer(request, maxHttpHeaderSize,readTimeout); + inputBuffer = new InternalNioInputBuffer(request, maxHttpHeaderSize); request.setInputBuffer(inputBuffer); response = new Response(); response.setHook(this); - outputBuffer = new InternalNioOutputBuffer(response, maxHttpHeaderSize,readTimeout); + outputBuffer = new InternalNioOutputBuffer(response, maxHttpHeaderSize); response.setOutputBuffer(outputBuffer); request.setResponse(response); @@ -819,7 +820,6 @@ try { if( !disableUploadTimeout && keptAlive && soTimeout > 0 ) { socket.getIOChannel().socket().setSoTimeout((int)soTimeout); - inputBuffer.readTimeout = soTimeout; } if (!inputBuffer.parseRequestLine(keptAlive)) { //no data available yet, since we might have read part @@ -839,7 +839,6 @@ request.setStartTime(System.currentTimeMillis()); if (!disableUploadTimeout) { //only for body, not for request headers socket.getIOChannel().socket().setSoTimeout((int)timeout); - inputBuffer.readTimeout = soTimeout; } } catch (IOException e) { error = true; @@ -1223,20 +1222,22 @@ } else if (actionCode == ActionCode.ACTION_COMET_REGISTER) { int interest = getPollerInterest(param); NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment)socket.getAttachment(false); - attach.setCometOps(attach.getCometOps()|interest); - //notify poller if not on a tomcat thread - RequestInfo rp = request.getRequestProcessor(); - if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) - socket.getPoller().cometInterest(socket); - } else if (actionCode == ActionCode.ACTION_COMET_UNREGISTER) { - int interest = getPollerInterest(param); - NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment)socket.getAttachment(false); - attach.setCometOps(attach.getCometOps()& (~interest)); + attach.setCometOps(interest); //notify poller if not on a tomcat thread RequestInfo rp = request.getRequestProcessor(); if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) socket.getPoller().cometInterest(socket); } else if (actionCode == ActionCode.ACTION_COMET_CONFIGURE) { + } else if (actionCode == ActionCode.ACTION_COMET_READABLE) { + MutableBoolean bool = (MutableBoolean)param; + try { + bool.set(inputBuffer.isReadable()); + }catch ( IOException x ) { + throw new RuntimeException(x); + } + } else if (actionCode == ActionCode.ACTION_COMET_WRITEABLE) { + MutableBoolean bool = (MutableBoolean)param; + bool.set(outputBuffer.isWritable()); } } Modified: tomcat/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java?view=diff&rev=543226&r1=543225&r2=543226 ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java Thu May 31 12:32:33 2007 @@ -30,6 +30,7 @@ import org.apache.tomcat.util.net.NioChannel; import org.apache.tomcat.util.net.NioSelectorPool; import org.apache.tomcat.util.res.StringManager; +import org.apache.tomcat.util.net.NioEndpoint; /** * Implementation of InputBuffer which provides HTTP request header parsing as @@ -51,8 +52,7 @@ /** * Alternate constructor. */ - public InternalNioInputBuffer(Request request, int headerBufferSize, - long readTimeout) { + public InternalNioInputBuffer(Request request, int headerBufferSize) { this.request = request; headers = request.getMimeHeaders(); @@ -80,12 +80,6 @@ headerData.recycle(); swallowInput = true; - if (readTimeout < 0) { - this.readTimeout = -1; - } else { - this.readTimeout = readTimeout; - } - } @@ -195,12 +189,6 @@ protected int lastActiveFilter; - /** - * The socket timeout used when reading the first block of the request - * header. - */ - protected long readTimeout; - // ------------------------------------------------------------- Properties @@ -296,7 +284,23 @@ } // --------------------------------------------------------- Public Methods - + /** + * Returns true if there are bytes available from the socket layer + * @return boolean + * @throws IOException + */ + public boolean isReadable() throws IOException { + return (pos < lastValid) || (nbRead()>0); + } + + /** + * Issues a non blocking read + * @return int + * @throws IOException + */ + public int nbRead() throws IOException { + return readSocket(true,false); + } /** * Recycle the input buffer. This should be called when closing the @@ -413,13 +417,8 @@ if (useAvailableData) { return false; } - if (readTimeout == -1) { - if (!fill(false,true)) //request line parsing - throw new EOFException(sm.getString("iib.eof.error")); - } else { - // Do a simple read with a short timeout - if ( !readSocket(true, false) ) return false; - } + // Do a simple read with a short timeout + if ( readSocket(true, false)==0 ) return false; } chr = buf[pos++]; } while ((chr == Constants.CR) || (chr == Constants.LF)); @@ -434,13 +433,8 @@ if (useAvailableData) { return false; } - if (readTimeout == -1) { - if (!fill(false,false)) //request line parsing - return false; - } else { - // Do a simple read with a short timeout - if ( !readSocket(true, false) ) return false; - } + // Do a simple read with a short timeout + if ( readSocket(true, false)==0 ) return false; } parsingRequestLinePhase = 2; } @@ -552,6 +546,7 @@ tmp = null; } } + /** * Perform blocking read with a timeout if desired * @param timeout boolean - if we want to use the timeout data @@ -560,15 +555,16 @@ * @throws IOException if a socket exception occurs * @throws EOFException if end of stream is reached */ - private boolean readSocket(boolean timeout, boolean block) throws IOException { + private int readSocket(boolean timeout, boolean block) throws IOException { int nRead = 0; - long rto = timeout?this.readTimeout:-1; socket.getBufHandler().getReadBuffer().clear(); if ( block ) { Selector selector = null; try { selector = getSelectorPool().get(); }catch ( IOException x ) {} try { - nRead = getSelectorPool().read(socket.getBufHandler().getReadBuffer(),socket,selector,rto); + NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false); + if ( att == null ) throw new IOException("Key must be cancelled."); + nRead = getSelectorPool().read(socket.getBufHandler().getReadBuffer(),socket,selector,att.getTimeout()); } catch ( EOFException eof ) { nRead = -1; } finally { @@ -583,12 +579,12 @@ expand(nRead + pos); socket.getBufHandler().getReadBuffer().get(buf, pos, nRead); lastValid = pos + nRead; - return true; + return nRead; } else if (nRead == -1) { //return false; throw new EOFException(sm.getString("iib.eof.error")); } else { - return false; + return 0; } } @@ -852,7 +848,7 @@ } // Do a simple read with a short timeout - read = readSocket(timeout,block); + read = readSocket(timeout,block)>0; } else { if (buf.length - end < 4500) { @@ -865,7 +861,7 @@ pos = end; lastValid = pos; // Do a simple read with a short timeout - read = readSocket(timeout, block); + read = readSocket(timeout, block)>0; } return read; } Modified: tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java?view=diff&rev=543226&r1=543225&r2=543226 ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java Thu May 31 12:32:33 2007 @@ -34,6 +34,8 @@ import org.apache.tomcat.util.net.NioEndpoint; import org.apache.tomcat.util.net.NioSelectorPool; import org.apache.tomcat.util.res.StringManager; +import java.io.EOFException; +import org.apache.tomcat.util.MutableInteger; /** * Output buffer. @@ -56,14 +58,14 @@ * Default constructor. */ public InternalNioOutputBuffer(Response response) { - this(response, Constants.DEFAULT_HTTP_HEADER_BUFFER_SIZE, 10000); + this(response, Constants.DEFAULT_HTTP_HEADER_BUFFER_SIZE); } /** * Alternate constructor. */ - public InternalNioOutputBuffer(Response response, int headerBufferSize, long writeTimeout) { + public InternalNioOutputBuffer(Response response, int headerBufferSize) { this.response = response; headers = response.getMimeHeaders(); @@ -86,8 +88,6 @@ committed = false; finished = false; - this.writeTimeout = writeTimeout; - // Cause loading of HttpMessages HttpMessages.getMessage(200); @@ -142,6 +142,10 @@ */ protected int pos; + /** + * Number of bytes last written + */ + protected MutableInteger lastWrite = new MutableInteger(1); /** * Underlying socket. @@ -179,12 +183,6 @@ */ protected int lastActiveFilter; - /** - * Write time out in milliseconds - */ - protected long writeTimeout = -1; - - // ------------------------------------------------------------- Properties @@ -195,10 +193,6 @@ this.socket = socket; } - public void setWriteTimeout(long writeTimeout) { - this.writeTimeout = writeTimeout; - } - /** * Get the underlying socket input stream. */ @@ -206,10 +200,6 @@ return socket; } - public long getWriteTimeout() { - return writeTimeout; - } - public void setSelectorPool(NioSelectorPool pool) { this.pool = pool; } @@ -324,7 +314,6 @@ // Recycle Request object response.recycle(); - } @@ -343,6 +332,7 @@ lastActiveFilter = -1; committed = false; finished = false; + lastWrite.set(1); } @@ -401,7 +391,9 @@ } - + public boolean isWritable() { + return lastWrite.get()>0; + } // ------------------------------------------------ HTTP/1.1 Output Methods @@ -414,15 +406,25 @@ if (!committed) { //Socket.send(socket, Constants.ACK_BYTES, 0, Constants.ACK_BYTES.length) < 0 ByteBuffer buf = ByteBuffer.wrap(Constants.ACK_BYTES,0,Constants.ACK_BYTES.length); - writeToSocket(buf,false); + writeToSocket(buf,false,true); } } - private synchronized int writeToSocket(ByteBuffer bytebuffer, boolean flip) throws IOException { + /** + * + * @param bytebuffer ByteBuffer + * @param flip boolean + * @return int + * @throws IOException + */ + private synchronized int writeToSocket(ByteBuffer bytebuffer, boolean flip, boolean block) throws IOException { //int limit = bytebuffer.position(); if ( flip ) bytebuffer.flip(); int written = 0; + NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false); + if ( att == null ) throw new IOException("Key must be cancelled"); + long writeTimeout = att.getTimeout(); Selector selector = null; try { selector = getSelectorPool().get(); @@ -430,10 +432,10 @@ //ignore } try { - written = getSelectorPool().write(bytebuffer, socket, selector, writeTimeout); + written = getSelectorPool().write(bytebuffer, socket, selector, writeTimeout, block,lastWrite); //make sure we are flushed do { - if (socket.flush(true,selector,writeTimeout)) break; + if (socket.flush(true,selector,writeTimeout,lastWrite)) break; }while ( true ); }finally { if ( selector != null ) getSelectorPool().put(selector); @@ -759,7 +761,7 @@ //write to the socket, if there is anything to write if (socket.getBufHandler().getWriteBuffer().position() > 0) { - writeToSocket(socket.getBufHandler().getWriteBuffer(),true); + writeToSocket(socket.getBufHandler().getWriteBuffer(),true,true); } } Added: tomcat/trunk/java/org/apache/tomcat/util/MutableBoolean.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/MutableBoolean.java?view=auto&rev=543226 ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/MutableBoolean.java (added) +++ tomcat/trunk/java/org/apache/tomcat/util/MutableBoolean.java Thu May 31 12:32:33 2007 @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tomcat.util; + +public class MutableBoolean { + protected boolean value = false; + public MutableBoolean() {} + public MutableBoolean(boolean val) { + this.value = val; + } + + public boolean get() { return value;} + public void set(boolean val) {this.value = val;} +} Added: tomcat/trunk/java/org/apache/tomcat/util/MutableInteger.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/MutableInteger.java?view=auto&rev=543226 ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/MutableInteger.java (added) +++ tomcat/trunk/java/org/apache/tomcat/util/MutableInteger.java Thu May 31 12:32:33 2007 @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tomcat.util; + +public class MutableInteger { + protected int value = 0; + public MutableInteger() {} + public MutableInteger(int val) { + this.value = val; + } + + public int get() { return value;} + public void set(int val) {this.value = val;} +} Modified: tomcat/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java?view=diff&rev=543226&r1=543225&r2=543226 ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java Thu May 31 12:32:33 2007 @@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit; import org.apache.tomcat.util.net.NioEndpoint.KeyAttachment; +import org.apache.tomcat.util.MutableInteger; public class NioBlockingSelector { public NioBlockingSelector() { @@ -41,7 +42,7 @@ * @throws SocketTimeoutException if the write times out * @throws IOException if an IO Exception occurs in the underlying socket logic */ - public static int write(ByteBuffer buf, NioChannel socket, long writeTimeout) throws IOException { + public static int write(ByteBuffer buf, NioChannel socket, long writeTimeout,MutableInteger lastWrite) throws IOException { SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); int written = 0; boolean timedout = false; @@ -55,6 +56,7 @@ while ( (!timedout) && buf.hasRemaining()) { if (keycount > 0) { //only write if we were registered for a write int cnt = socket.write(buf); //write the data + lastWrite.set(cnt); if (cnt == -1) throw new EOFException(); written += cnt; Modified: tomcat/trunk/java/org/apache/tomcat/util/net/NioChannel.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/NioChannel.java?view=diff&rev=543226&r1=543225&r2=543226 ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/NioChannel.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/NioChannel.java Thu May 31 12:32:33 2007 @@ -27,6 +27,7 @@ import org.apache.tomcat.util.net.SecureNioChannel.ApplicationBufferHandler; import java.nio.channels.Selector; import java.nio.channels.SelectionKey; +import org.apache.tomcat.util.MutableInteger; /** * @@ -70,7 +71,8 @@ * been flushed out and is empty * @return boolean */ - public boolean flush(boolean block, Selector s,long timeout) throws IOException { + public boolean flush(boolean block, Selector s, long timeout,MutableInteger lastWrite) throws IOException { + if (lastWrite!=null) lastWrite.set(1); return true; //no network buffer in the regular channel } Modified: tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?view=diff&rev=543226&r1=543225&r2=543226 ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Thu May 31 12:32:33 2007 @@ -479,24 +479,13 @@ /** * The socket poller. */ - protected Poller[] pollers = null; - protected int pollerRoundRobin = 0; + protected Poller poller = null; public Poller getPoller0() { - pollerRoundRobin = (pollerRoundRobin + 1) % pollers.length; - Poller poller = pollers[pollerRoundRobin]; return poller; } - - /** - * The socket poller used for Comet support. - */ - public Poller getCometPoller0() { - Poller poller = getPoller0(); - return poller; - } - - + protected Poller readWritePoller = null; + /** * Dummy maxSpareThreads property. */ @@ -649,14 +638,10 @@ * Number of keepalive sockets. */ public int getKeepAliveCount() { - if (pollers == null) { + if (poller == null) { return 0; } else { - int keepAliveCount = 0; - for (int i = 0; i < pollers.length; i++) { - keepAliveCount += pollers[i].getKeepAliveCount(); - } - return keepAliveCount; + return poller.selector.keys().size(); } } @@ -793,16 +778,12 @@ acceptorThread.start(); } - // Start poller threads - pollers = new Poller[pollerThreadCount]; - for (int i = 0; i < pollerThreadCount; i++) { - pollers[i] = new Poller(); - pollers[i].init(); - Thread pollerThread = new Thread(pollers[i], getName() + "-Poller-" + i); - pollerThread.setPriority(threadPriority); - pollerThread.setDaemon(true); - pollerThread.start(); - } + // Start poller thread + poller = new Poller(); + Thread pollerThread = new Thread(poller, getName() + "-ClientPoller"); + pollerThread.setPriority(threadPriority); + pollerThread.setDaemon(true); + pollerThread.start(); } } @@ -836,10 +817,8 @@ if (running) { running = false; unlockAccept(); - for (int i = 0; i < pollers.length; i++) { - pollers[i].destroy(); - } - pollers = null; + poller.destroy(); + poller = null; } eventCache.clear(); keyCache.clear(); @@ -1118,6 +1097,8 @@ protected boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) { try { + KeyAttachment attachment = (KeyAttachment)socket.getAttachment(false); + attachment.setCometNotify(false); //will get reset upon next reg if (executor == null) { getWorkerThread().assign(socket, status); } else { @@ -1248,8 +1229,9 @@ interestOps = (interestOps & (~OP_CALLBACK));//remove the callback flag att.access();//to prevent timeout //we are registering the key to start with, reset the fairness counter. - att.interestOps(interestOps); - key.interestOps(interestOps); + int ops = key.interestOps() | interestOps; + att.interestOps(ops); + key.interestOps(ops); } else { cancel = true; } @@ -1269,6 +1251,7 @@ return super.toString()+"[intOps="+this.interestOps+"]"; } } + /** * Poller class. */ @@ -1279,9 +1262,6 @@ protected boolean close = false; protected long nextExpiration = 0;//optimize expiration handling - - protected int keepAliveCount = 0; - public int getKeepAliveCount() { return keepAliveCount; } protected AtomicLong wakeupCounter = new AtomicLong(0l); @@ -1296,14 +1276,6 @@ public Selector getSelector() { return selector;} /** - * Create the poller. With some versions of APR, the maximum poller size will - * be 62 (reocmpiling APR is necessary to remove this limitation). - */ - protected void init() { - keepAliveCount = 0; - } - - /** * Destroy the poller. */ protected void destroy() { @@ -1379,7 +1351,7 @@ socket.setPoller(this); KeyAttachment key = keyCache.poll(); final KeyAttachment ka = key!=null?key:new KeyAttachment(); - ka.reset(this,socket); + ka.reset(this,socket,getSocketProperties().getSoTimeout()); PollerEvent r = eventCache.poll(); ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into. if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER); @@ -1621,7 +1593,6 @@ } else if ( ka.getError() ) { cancelledKey(key, SocketStatus.ERROR,true); } else if (ka.getComet() && ka.getCometNotify() ) { - ka.setCometNotify(false);//this will get reset after invokation if callback is still in there reg(key,ka,0);//avoid multiple calls, this gets reregistered after invokation if (!processSocket(ka.getChannel(), SocketStatus.OPEN_CALLBACK)) processSocket(ka.getChannel(), SocketStatus.DISCONNECT); }else if ((ka.interestOps()&SelectionKey.OP_READ) == SelectionKey.OP_READ) { @@ -1656,13 +1627,13 @@ public KeyAttachment() { } - public void reset(Poller poller, NioChannel channel) { + public void reset(Poller poller, NioChannel channel, long soTimeout) { this.channel = channel; this.poller = poller; lastAccess = System.currentTimeMillis(); currentAccess = false; comet = false; - timeout = -1; + timeout = soTimeout; error = false; lastRegistered = 0; sendfileData = null; @@ -1676,7 +1647,7 @@ } public void reset() { - reset(null,null); + reset(null,null,-1); } public Poller getPoller() { return poller;} Modified: tomcat/trunk/java/org/apache/tomcat/util/net/NioSelectorPool.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/NioSelectorPool.java?view=diff&rev=543226&r1=543225&r2=543226 ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/NioSelectorPool.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/NioSelectorPool.java Thu May 31 12:32:33 2007 @@ -16,17 +16,20 @@ */ package org.apache.tomcat.util.net; -import java.util.concurrent.atomic.AtomicInteger; -import java.nio.channels.Selector; +import java.io.EOFException; import java.io.IOException; -import java.util.NoSuchElementException; +import java.net.SocketTimeoutException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; -import java.io.EOFException; -import java.net.SocketTimeoutException; +import java.nio.channels.Selector; +import java.util.NoSuchElementException; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; + import org.apache.juli.logging.Log; import org.apache.juli.logging.LogFactory; +import org.apache.tomcat.util.MutableInteger; +import java.util.Iterator; /** * @@ -37,20 +40,23 @@ */ public class NioSelectorPool { + protected static int threadCount = 0; + protected static Log log = LogFactory.getLog(NioSelectorPool.class); protected final static boolean SHARED = Boolean.valueOf(System.getProperty("org.apache.tomcat.util.net.NioSelectorShared", "true")).booleanValue(); - protected static Selector SHARED_SELECTOR; + protected Selector SHARED_SELECTOR; protected int maxSelectors = 200; + protected long sharedSelectorTimeout = 30000; protected int maxSpareSelectors = -1; protected boolean enabled = true; protected AtomicInteger active = new AtomicInteger(0); protected AtomicInteger spare = new AtomicInteger(0); protected ConcurrentLinkedQueue<Selector> selectors = new ConcurrentLinkedQueue<Selector>(); - protected static Selector getSharedSelector() throws IOException { + protected Selector getSharedSelector() throws IOException { if (SHARED && SHARED_SELECTOR == null) { synchronized ( NioSelectorPool.class ) { if ( SHARED_SELECTOR == null ) { @@ -127,12 +133,13 @@ * @throws IOException if an IO Exception occurs in the underlying socket logic */ public int write(ByteBuffer buf, NioChannel socket, Selector selector, long writeTimeout) throws IOException { - return write(buf,socket,selector,writeTimeout,true); + return write(buf,socket,selector,writeTimeout,true,null); } - public int write(ByteBuffer buf, NioChannel socket, Selector selector, long writeTimeout, boolean block) throws IOException { - if ( SHARED && block) { - return NioBlockingSelector.write(buf,socket,writeTimeout); + public int write(ByteBuffer buf, NioChannel socket, Selector selector, + long writeTimeout, boolean block,MutableInteger lastWrite) throws IOException { + if ( SHARED && block ) { + return NioBlockingSelector.write(buf,socket,writeTimeout,lastWrite); } SelectionKey key = null; int written = 0; @@ -148,7 +155,9 @@ int cnt = 0; if ( keycount > 0 ) { //only write if we were registered for a write cnt = socket.write(buf); //write the data + if (lastWrite!=null) lastWrite.set(cnt); if (cnt == -1) throw new EOFException(); + written += cnt; if (cnt > 0) { time = System.currentTimeMillis(); //reset our timeout timer @@ -206,7 +215,7 @@ * @throws IOException if an IO Exception occurs in the underlying socket logic */ public int read(ByteBuffer buf, NioChannel socket, Selector selector, long readTimeout, boolean block) throws IOException { - if ( SHARED && block) { + if ( SHARED && block ) { return NioBlockingSelector.read(buf,socket,readTimeout); } SelectionKey key = null; @@ -254,6 +263,10 @@ this.enabled = enabled; } + public void setSharedSelectorTimeout(long sharedSelectorTimeout) { + this.sharedSelectorTimeout = sharedSelectorTimeout; + } + public int getMaxSelectors() { return maxSelectors; } @@ -264,5 +277,17 @@ public boolean isEnabled() { return enabled; + } + + public long getSharedSelectorTimeout() { + return sharedSelectorTimeout; + } + + public ConcurrentLinkedQueue getSelectors() { + return selectors; + } + + public AtomicInteger getSpare() { + return spare; } } Modified: tomcat/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java?view=diff&rev=543226&r1=543225&r2=543226 ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java Thu May 31 12:32:33 2007 @@ -25,6 +25,7 @@ import javax.net.ssl.SSLEngineResult.HandshakeStatus; import javax.net.ssl.SSLEngineResult.Status; import java.nio.channels.Selector; +import org.apache.tomcat.util.MutableInteger; /** * @@ -102,11 +103,11 @@ * been flushed out and is empty * @return boolean */ - public boolean flush(boolean block, Selector s, long timeout) throws IOException { + public boolean flush(boolean block, Selector s, long timeout,MutableInteger lastWrite) throws IOException { if (!block) { flush(netOutBuffer); } else { - pool.write(netOutBuffer, this, s, timeout); + pool.write(netOutBuffer, this, s, timeout,block,lastWrite); } return !netOutBuffer.hasRemaining(); } --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]