Author: fhanik Date: Fri Mar 19 14:22:09 2010 New Revision: 925232 URL: http://svn.apache.org/viewvc?rev=925232&view=rev Log: More async stuff, only timeout left
Modified: tomcat/trunk/java/org/apache/coyote/ajp/AjpProtocol.java tomcat/trunk/java/org/apache/coyote/http11/Http11Processor.java tomcat/trunk/java/org/apache/coyote/http11/Http11Protocol.java tomcat/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapper.java tomcat/trunk/webapps/examples/jsp/async/index.jsp Modified: tomcat/trunk/java/org/apache/coyote/ajp/AjpProtocol.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/AjpProtocol.java?rev=925232&r1=925231&r2=925232&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/ajp/AjpProtocol.java (original) +++ tomcat/trunk/java/org/apache/coyote/ajp/AjpProtocol.java Fri Mar 19 14:22:09 2010 @@ -41,6 +41,7 @@ import org.apache.juli.logging.LogFactor import org.apache.tomcat.util.modeler.Registry; import org.apache.tomcat.util.net.JIoEndpoint; import org.apache.tomcat.util.net.SocketWrapper; +import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState; import org.apache.tomcat.util.net.JIoEndpoint.Handler; import org.apache.tomcat.util.res.StringManager; @@ -362,7 +363,7 @@ public class AjpProtocol this.proto = proto; } - public boolean process(SocketWrapper<Socket> socket) { + public SocketState process(SocketWrapper<Socket> socket) { AjpProcessor processor = recycledProcessors.poll(); try { @@ -373,7 +374,7 @@ public class AjpProtocol processor.action(ActionCode.ACTION_START, null); processor.process(socket.getSocket()); - return false; + return SocketState.CLOSED; } catch(java.net.SocketException e) { // SocketExceptions are normal @@ -399,7 +400,7 @@ public class AjpProtocol processor.action(ActionCode.ACTION_STOP, null); recycledProcessors.offer(processor); } - return false; + return SocketState.CLOSED; } protected AjpProcessor createProcessor() { Modified: tomcat/trunk/java/org/apache/coyote/http11/Http11Processor.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/Http11Processor.java?rev=925232&r1=925231&r2=925232&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/Http11Processor.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/Http11Processor.java Fri Mar 19 14:22:09 2010 @@ -23,6 +23,7 @@ import java.net.InetAddress; import java.net.Socket; import java.security.AccessController; import java.security.PrivilegedAction; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.coyote.ActionCode; import org.apache.coyote.ActionHook; @@ -47,7 +48,9 @@ import org.apache.tomcat.util.http.FastH import org.apache.tomcat.util.http.MimeHeaders; import org.apache.tomcat.util.net.JIoEndpoint; import org.apache.tomcat.util.net.SSLSupport; +import org.apache.tomcat.util.net.SocketStatus; import org.apache.tomcat.util.net.SocketWrapper; +import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState; /** @@ -109,11 +112,16 @@ public class Http11Processor extends Abs */ protected SSLSupport sslSupport; + /** + * Async used + */ + protected boolean async = false; + /** * Socket associated with the current connection. */ - protected Socket socket; + protected SocketWrapper<Socket> socket; @@ -151,9 +159,8 @@ public class Http11Processor extends Abs * * @throws IOException error during an I/O operation */ - public boolean process(SocketWrapper<Socket> socketWrapper) + public SocketState process(SocketWrapper<Socket> socketWrapper) throws IOException { - Socket theSocket = socketWrapper.getSocket(); RequestInfo rp = request.getRequestProcessor(); rp.setStage(org.apache.coyote.Constants.STAGE_PARSE); @@ -166,9 +173,9 @@ public class Http11Processor extends Abs localPort = -1; // Setting up the I/O - this.socket = theSocket; - inputBuffer.setInputStream(socket.getInputStream()); - outputBuffer.setOutputStream(socket.getOutputStream()); + this.socket = socketWrapper; + inputBuffer.setInputStream(socket.getSocket().getInputStream()); + outputBuffer.setOutputStream(socket.getSocket().getOutputStream()); // Error flag error = false; @@ -179,7 +186,7 @@ public class Http11Processor extends Abs int soTimeout = endpoint.getSoTimeout(); try { - socket.setSoTimeout(soTimeout); + socket.getSocket().setSoTimeout(soTimeout); } catch (Throwable t) { log.debug(sm.getString("http11processor.socket.timeout"), t); error = true; @@ -194,19 +201,19 @@ public class Http11Processor extends Abs //TODO - calculate timeout based on length in queue (System.currentTimeMills() - wrapper.getLastAccess() is the time in queue) if (keptAlive) { if (keepAliveTimeout > 0) { - socket.setSoTimeout(keepAliveTimeout); + socket.getSocket().setSoTimeout(keepAliveTimeout); } else if (soTimeout > 0) { - socket.setSoTimeout(soTimeout); + socket.getSocket().setSoTimeout(soTimeout); } } inputBuffer.parseRequestLine(false); request.setStartTime(System.currentTimeMillis()); keptAlive = true; if (disableUploadTimeout) { - socket.setSoTimeout(soTimeout); + socket.getSocket().setSoTimeout(soTimeout); } else { - socket.setSoTimeout(timeout); + socket.getSocket().setSoTimeout(timeout); } inputBuffer.parseHeaders(); } catch (IOException e) { @@ -270,11 +277,11 @@ public class Http11Processor extends Abs // If we know we are closing the connection, don't drain input. // This way uploading a 100GB file doesn't tie up the thread // if the servlet has rejected it. - if(error) + + if(error && !async) inputBuffer.setSwallowInput(false); - inputBuffer.endRequest(); - } catch (IOException e) { - error = true; + if (!async) + endRequest(); } catch (Throwable t) { log.error(sm.getString("http11processor.request.finish"), t); // 500 - Internal Server Error @@ -283,9 +290,6 @@ public class Http11Processor extends Abs } try { rp.setStage(org.apache.coyote.Constants.STAGE_ENDOUTPUT); - outputBuffer.endRequest(); - } catch (IOException e) { - error = true; } catch (Throwable t) { log.error(sm.getString("http11processor.response.finish"), t); error = true; @@ -304,26 +308,104 @@ public class Http11Processor extends Abs // will reset it // thrA.setParam(null); // Next request - inputBuffer.nextRequest(); - outputBuffer.nextRequest(); + if (!async) { + inputBuffer.nextRequest(); + outputBuffer.nextRequest(); + } //hack keep alive behavior break; } rp.setStage(org.apache.coyote.Constants.STAGE_ENDED); + if (async) { + if (error) { + recycle(); + return SocketState.CLOSED; + } else { + socket.setAsync(true); + return SocketState.LONG; + } + } else { + socket.setAsync(false); + if ( error || (!keepAlive)) { + recycle(); + return SocketState.CLOSED; + } else { + return SocketState.OPEN; + } + } + } + + + public SocketState asyncDispatch(SocketStatus status) throws IOException { + + RequestInfo rp = request.getRequestProcessor(); + try { + rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE); + error = !adapter.asyncDispatch(request, response, status); + } catch (InterruptedIOException e) { + error = true; + } catch (Throwable t) { + log.error(sm.getString("http11processor.request.process"), t); + // 500 - Internal Server Error + response.setStatus(500); + error = true; + } + + rp.setStage(org.apache.coyote.Constants.STAGE_ENDED); + if (async) { + if (error) { + recycle(); + return SocketState.CLOSED; + } else { + return SocketState.LONG; + } + } else { + if ( error || (!keepAlive)) { + recycle(); + return SocketState.CLOSED; + } else { + return SocketState.OPEN; + } + } + } + + + public void endRequest() { + + // Finish the handling of the request + try { + inputBuffer.endRequest(); + } catch (IOException e) { + error = true; + } catch (Throwable t) { + log.error(sm.getString("http11processor.request.finish"), t); + // 500 - Internal Server Error + response.setStatus(500); + error = true; + } + try { + outputBuffer.endRequest(); + } catch (IOException e) { + error = true; + } catch (Throwable t) { + log.error(sm.getString("http11processor.response.finish"), t); + error = true; + } + + } + + + public void recycle() { // Recycle inputBuffer.recycle(); outputBuffer.recycle(); this.socket = null; + async = false; // Recycle ssl info sslSupport = null; - if (log.isTraceEnabled()) { - boolean returnvalue = (!error && keepAlive); - log.trace("Returning "+returnvalue+" to adjust for keep alive."); - } - return !error && keepAlive; } @@ -383,7 +465,7 @@ public class Http11Processor extends Abs } else if (actionCode == ActionCode.ACTION_CLOSE) { // Close - + async = false; // End the processing of the current request, and stop any further // transactions with the client @@ -443,7 +525,7 @@ public class Http11Processor extends Abs } else if (actionCode == ActionCode.ACTION_REQ_HOST_ADDR_ATTRIBUTE) { if ((remoteAddr == null) && (socket != null)) { - InetAddress inetAddr = socket.getInetAddress(); + InetAddress inetAddr = socket.getSocket().getInetAddress(); if (inetAddr != null) { remoteAddr = inetAddr.getHostAddress(); } @@ -453,7 +535,7 @@ public class Http11Processor extends Abs } else if (actionCode == ActionCode.ACTION_REQ_LOCAL_NAME_ATTRIBUTE) { if ((localName == null) && (socket != null)) { - InetAddress inetAddr = socket.getLocalAddress(); + InetAddress inetAddr = socket.getSocket().getLocalAddress(); if (inetAddr != null) { localName = inetAddr.getHostName(); } @@ -463,7 +545,7 @@ public class Http11Processor extends Abs } else if (actionCode == ActionCode.ACTION_REQ_HOST_ATTRIBUTE) { if ((remoteHost == null) && (socket != null)) { - InetAddress inetAddr = socket.getInetAddress(); + InetAddress inetAddr = socket.getSocket().getInetAddress(); if (inetAddr != null) { remoteHost = inetAddr.getHostName(); } @@ -480,21 +562,21 @@ public class Http11Processor extends Abs } else if (actionCode == ActionCode.ACTION_REQ_LOCAL_ADDR_ATTRIBUTE) { if (localAddr == null) - localAddr = socket.getLocalAddress().getHostAddress(); + localAddr = socket.getSocket().getLocalAddress().getHostAddress(); request.localAddr().setString(localAddr); } else if (actionCode == ActionCode.ACTION_REQ_REMOTEPORT_ATTRIBUTE) { if ((remotePort == -1 ) && (socket !=null)) { - remotePort = socket.getPort(); + remotePort = socket.getSocket().getPort(); } request.setRemotePort(remotePort); } else if (actionCode == ActionCode.ACTION_REQ_LOCALPORT_ATTRIBUTE) { if ((localPort == -1 ) && (socket !=null)) { - localPort = socket.getLocalPort(); + localPort = socket.getSocket().getLocalPort(); } request.setLocalPort(localPort); @@ -530,12 +612,35 @@ public class Http11Processor extends Abs internalBuffer.addActiveFilter(savedBody); } else if (actionCode == ActionCode.ACTION_ASYNC_START) { //TODO SERVLET3 - async + async = true; } else if (actionCode == ActionCode.ACTION_ASYNC_COMPLETE) { //TODO SERVLET3 - async + AtomicBoolean dispatch = (AtomicBoolean)param; + RequestInfo rp = request.getRequestProcessor(); + if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) { //async handling + dispatch.set(true); + endpoint.processSocket(this.socket, SocketStatus.STOP); + } else { + //TODO SERVLET3 async=false + } } else if (actionCode == ActionCode.ACTION_ASYNC_SETTIMEOUT) { //TODO SERVLET3 - async + if (param==null) return; + long timeout = ((Long)param).longValue(); + //if we are not piggy backing on a worker thread, set the timeout + socket.setAsyncTimeout(timeout); + } else if (actionCode == ActionCode.ACTION_ASYNC_DISPATCH) { + RequestInfo rp = request.getRequestProcessor(); + AtomicBoolean dispatch = (AtomicBoolean)param; + if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) {//async handling + endpoint.processSocket(this.socket, SocketStatus.OPEN); + dispatch.set(true); + } else { + //TODO SERVLET3 - do nothing? + } } + } @@ -967,8 +1072,8 @@ public class Http11Processor extends Abs // HTTP/1.0 // Default is what the socket tells us. Overridden if a host is // found/parsed - request.setServerPort(socket.getLocalPort()); - InetAddress localAddress = socket.getLocalAddress(); + request.setServerPort(socket.getSocket().getLocalPort()); + InetAddress localAddress = socket.getSocket().getLocalAddress(); // Setting the socket-related fields. The adapter doesn't know // about socket. request.serverName().setString(localAddress.getHostName()); Modified: tomcat/trunk/java/org/apache/coyote/http11/Http11Protocol.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/Http11Protocol.java?rev=925232&r1=925231&r2=925232&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/Http11Protocol.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/Http11Protocol.java Fri Mar 19 14:22:09 2010 @@ -19,6 +19,7 @@ package org.apache.coyote.http11; import java.net.Socket; import java.util.Iterator; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -30,9 +31,12 @@ import org.apache.coyote.RequestGroupInf import org.apache.coyote.RequestInfo; import org.apache.tomcat.util.modeler.Registry; import org.apache.tomcat.util.net.JIoEndpoint; +import org.apache.tomcat.util.net.NioChannel; import org.apache.tomcat.util.net.SSLImplementation; import org.apache.tomcat.util.net.ServerSocketFactory; +import org.apache.tomcat.util.net.SocketStatus; import org.apache.tomcat.util.net.SocketWrapper; +import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState; import org.apache.tomcat.util.net.JIoEndpoint.Handler; @@ -182,6 +186,8 @@ public class Http11Protocol extends Abst protected Http11Protocol proto; protected AtomicLong registerCount = new AtomicLong(0); protected RequestGroupInfo global = new RequestGroupInfo(); + protected ConcurrentHashMap<SocketWrapper, Http11Processor> connections = + new ConcurrentHashMap<SocketWrapper, Http11Processor>(); protected ConcurrentLinkedQueue<Http11Processor> recycledProcessors = new ConcurrentLinkedQueue<Http11Processor>() { @@ -227,14 +233,16 @@ public class Http11Protocol extends Abst this.proto = proto; } - public boolean process(SocketWrapper<Socket> socket) { - Http11Processor processor = recycledProcessors.poll(); + + public SocketState process(SocketWrapper<Socket> socket) { + Http11Processor processor = connections.remove(socket); try { - + if (processor == null) { + processor = recycledProcessors.poll(); + } if (processor == null) { processor = createProcessor(); } - processor.action(ActionCode.ACTION_START, null); if (proto.isSSLEnabled() && (proto.sslImplementation != null)) { @@ -244,9 +252,13 @@ public class Http11Protocol extends Abst processor.setSSLSupport(null); } - return processor.process(socket); - //return false; - + SocketState state = socket.isAsync()?processor.asyncDispatch(SocketStatus.OPEN):processor.process(socket); + if (state == SocketState.LONG) { + connections.put(socket, processor); + } else { + connections.remove(socket); + } + return state; } catch(java.net.SocketException e) { // SocketExceptions are normal Http11Protocol.log.debug @@ -274,7 +286,7 @@ public class Http11Protocol extends Abst processor.action(ActionCode.ACTION_STOP, null); recycledProcessors.offer(processor); } - return false; + return SocketState.CLOSED; } protected Http11Processor createProcessor() { Modified: tomcat/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java?rev=925232&r1=925231&r2=925232&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java Fri Mar 19 14:22:09 2010 @@ -22,11 +22,15 @@ import java.net.BindException; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketException; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.RejectedExecutionException; import org.apache.juli.logging.Log; import org.apache.juli.logging.LogFactory; import org.apache.tomcat.util.IntrospectionUtils; +import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState; +import org.apache.tomcat.util.net.NioEndpoint.KeyAttachment; +import org.apache.tomcat.util.net.NioEndpoint.SocketProcessor; /** * Handle incoming TCP connections. @@ -100,6 +104,8 @@ public class JIoEndpoint extends Abstrac public ServerSocketFactory getServerSocketFactory() { return serverSocketFactory; } + + // ------------------------------------------------ Handler Inner Interface @@ -110,7 +116,7 @@ public class JIoEndpoint extends Abstrac * thread local fields. */ public interface Handler { - public boolean process(SocketWrapper<Socket> socket); + public SocketState process(SocketWrapper<Socket> socket); } @@ -185,16 +191,17 @@ public class JIoEndpoint extends Abstrac } public void run() { - boolean close = false; + SocketState state = SocketState.OPEN; // Process the request from this socket - if ( (!socket.isKeptAlive()) && (!setSocketOptions(socket.getSocket())) ) { //this does a handshake and resets socket value - close = true; + if ( (!socket.isInitialized()) && (!setSocketOptions(socket.getSocket())) ) { + state = SocketState.CLOSED; } + socket.setInitialized(true); - if ( (!close) ) { - close = !handler.process(socket); + if ( (state != SocketState.CLOSED) ) { + state = handler.process(socket); } - if (close) { + if (state == SocketState.CLOSED) { // Close socket if (log.isTraceEnabled()) { log.trace("Closing socket:"+socket); @@ -204,12 +211,15 @@ public class JIoEndpoint extends Abstrac } catch (IOException e) { // Ignore } - } else { + } else if (state == SocketState.OPEN){ socket.setKeptAlive(true); socket.access(); //keepalive connection //TODO - servlet3 check async status, we may just be in a hold pattern getExecutor().execute(new SocketProcessor(socket)); + } else if (state == SocketState.LONG) { + socket.access(); + waitingRequests.add(socket); } // Finish up this request socket = null; @@ -430,5 +440,31 @@ public class JIoEndpoint extends Abstrac return true; } + public boolean processSocket(SocketWrapper<Socket> socket, SocketStatus status) { + try { + if (status == SocketStatus.OPEN || status == SocketStatus.STOP) { + if (waitingRequests.remove(socket)) { + SocketProcessor proc = new SocketProcessor(socket); + getExecutor().execute(proc); + } + } + } catch (Throwable t) { + // This means we got an OOM or similar creating a thread, or that + // the pool and its queue are full + log.error(sm.getString("endpoint.process.fail"), t); + return false; + } + return true; + } + + protected ConcurrentLinkedQueue<SocketWrapper> waitingRequests = new ConcurrentLinkedQueue<SocketWrapper>(); + + protected class RequestProcessor implements Runnable { + @Override + public void run() { + + } + + } } Modified: tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapper.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapper.java?rev=925232&r1=925231&r2=925232&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapper.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapper.java Fri Mar 19 14:22:09 2010 @@ -29,6 +29,8 @@ public class SocketWrapper<E> { protected volatile int keepAliveLeft = 100; protected boolean async = false; protected boolean keptAlive = false; + protected boolean initialized = false; + protected long asyncTimeout = 0; public SocketWrapper(E socket) { reset(socket); @@ -58,4 +60,9 @@ public class SocketWrapper<E> { public int decrementKeepAlive() { return (--keepAliveLeft);} public boolean isKeptAlive() {return keptAlive;} public void setKeptAlive(boolean keptAlive) {this.keptAlive = keptAlive;} + public boolean isInitialized() {return initialized;} + public void setInitialized(boolean initialized) {this.initialized = initialized;} + public long getAsyncTimeout() {return asyncTimeout;} + public void setAsyncTimeout(long asyncTimeout) {this.asyncTimeout = asyncTimeout;} + } Modified: tomcat/trunk/webapps/examples/jsp/async/index.jsp URL: http://svn.apache.org/viewvc/tomcat/trunk/webapps/examples/jsp/async/index.jsp?rev=925232&r1=925231&r2=925232&view=diff ============================================================================== --- tomcat/trunk/webapps/examples/jsp/async/index.jsp (original) +++ tomcat/trunk/webapps/examples/jsp/async/index.jsp Fri Mar 19 14:22:09 2010 @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. --> -...@page session="false"%> +<%...@page session="false"%> <pre> Use cases: --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org