Author: fhanik Date: Thu Mar 25 20:11:32 2010 New Revision: 927576 URL: http://svn.apache.org/viewvc?rev=927576&view=rev Log: prevent duplicate threads processing the same socket
Modified: tomcat/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapper.java Modified: tomcat/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java?rev=927576&r1=927575&r2=927576&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java Thu Mar 25 20:11:32 2010 @@ -33,6 +33,8 @@ import org.apache.tomcat.util.net.Abstra import org.apache.tomcat.util.net.NioEndpoint.KeyAttachment; import org.apache.tomcat.util.net.NioEndpoint.SocketProcessor; +import com.sun.corba.se.impl.protocol.giopmsgheaders.Message; + /** * Handle incoming TCP connections. * @@ -228,47 +230,61 @@ public class JIoEndpoint extends Abstrac protected SocketStatus status = null; public SocketProcessor(SocketWrapper<Socket> socket) { + if (socket==null) throw new NullPointerException(); this.socket = socket; } public SocketProcessor(SocketWrapper<Socket> socket, SocketStatus status) { - this.socket = socket; + this(socket); this.status = status; } public void run() { - SocketState state = SocketState.OPEN; - // Process the request from this socket - if ( (!socket.isInitialized()) && (!setSocketOptions(socket.getSocket())) ) { - state = SocketState.CLOSED; - } - socket.setInitialized(true); - - if ( (state != SocketState.CLOSED) ) { - state = (status==null)?handler.process(socket):handler.process(socket,status); - } - if (state == SocketState.CLOSED) { - // Close socket - if (log.isTraceEnabled()) { - log.trace("Closing socket:"+socket); - } - try { - socket.getSocket().close(); - } catch (IOException e) { - // Ignore - } - } else if (state == SocketState.OPEN){ - socket.setKeptAlive(true); - socket.access(); - //keepalive connection - //TODO - servlet3 check async status, we may just be in a hold pattern - getExecutor().execute(new SocketProcessor(socket)); - } else if (state == SocketState.LONG) { - socket.access(); - waitingRequests.add(socket); + boolean launch = false; + try { + + if (!socket.processing.compareAndSet(false, true)) { + log.error("Unable to process socket. Invalid state."); + return; + } + + SocketState state = SocketState.OPEN; + // Process the request from this socket + if ( (!socket.isInitialized()) && (!setSocketOptions(socket.getSocket())) ) { + state = SocketState.CLOSED; + } + socket.setInitialized(true); + + if ( (state != SocketState.CLOSED) ) { + state = (status==null)?handler.process(socket):handler.process(socket,status); + } + if (state == SocketState.CLOSED) { + // Close socket + if (log.isTraceEnabled()) { + log.trace("Closing socket:"+socket); + } + try { + socket.getSocket().close(); + } catch (IOException e) { + // Ignore + } + } else if (state == SocketState.OPEN){ + socket.setKeptAlive(true); + socket.access(); + //keepalive connection + //TODO - servlet3 check async status, we may just be in a hold pattern + launch = true; + } else if (state == SocketState.LONG) { + socket.access(); + waitingRequests.add(socket); + } + } finally { + socket.processing.set(false); + if (launch) getExecutor().execute(new SocketProcessor(socket)); + socket = null; } // Finish up this request - socket = null; + } } Modified: tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapper.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapper.java?rev=927576&r1=927575&r2=927576&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapper.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapper.java Thu Mar 25 20:11:32 2010 @@ -16,6 +16,8 @@ */ package org.apache.tomcat.util.net; +import java.util.concurrent.atomic.AtomicBoolean; + public class SocketWrapper<E> { @@ -30,19 +32,16 @@ public class SocketWrapper<E> { protected boolean async = false; protected boolean keptAlive = false; protected boolean initialized = false; + public AtomicBoolean processing = new AtomicBoolean(false); public SocketWrapper(E socket) { - reset(socket); + this.socket = socket; } public E getSocket() { return socket; } - public void reset(E socket) { - this.socket = socket; - } - public boolean isAsync() { return async; } public void setAsync(boolean async) { this.async = async; } public long getLastAccess() { return lastAccess; } --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org