Author: fhanik
Date: Thu Mar 25 20:11:32 2010
New Revision: 927576

URL: http://svn.apache.org/viewvc?rev=927576&view=rev
Log:
prevent duplicate threads processing the same socket

Modified:
    tomcat/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java
    tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapper.java

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java?rev=927576&r1=927575&r2=927576&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java Thu Mar 25 
20:11:32 2010
@@ -33,6 +33,8 @@ import org.apache.tomcat.util.net.Abstra
 import org.apache.tomcat.util.net.NioEndpoint.KeyAttachment;
 import org.apache.tomcat.util.net.NioEndpoint.SocketProcessor;
 
+import com.sun.corba.se.impl.protocol.giopmsgheaders.Message;
+
 /**
  * Handle incoming TCP connections.
  *
@@ -228,47 +230,61 @@ public class JIoEndpoint extends Abstrac
         protected SocketStatus status = null;
         
         public SocketProcessor(SocketWrapper<Socket> socket) {
+            if (socket==null) throw new NullPointerException();
             this.socket = socket;
         }
 
         public SocketProcessor(SocketWrapper<Socket> socket, SocketStatus 
status) {
-            this.socket = socket;
+            this(socket);
             this.status = status;
         }
 
         public void run() {
-               SocketState state = SocketState.OPEN;
-            // Process the request from this socket
-            if ( (!socket.isInitialized()) && 
(!setSocketOptions(socket.getSocket())) ) { 
-               state = SocketState.CLOSED;
-            }
-            socket.setInitialized(true);
-            
-            if ( (state != SocketState.CLOSED) ) {
-                state = 
(status==null)?handler.process(socket):handler.process(socket,status);
-            }
-            if (state == SocketState.CLOSED) {
-               // Close socket
-               if (log.isTraceEnabled()) {
-                       log.trace("Closing socket:"+socket);
-               }
-                try {
-                    socket.getSocket().close();
-                } catch (IOException e) {
-                    // Ignore
-                }
-            } else if (state == SocketState.OPEN){
-                socket.setKeptAlive(true);
-                socket.access();
-                //keepalive connection
-                //TODO - servlet3 check async status, we may just be in a hold 
pattern
-                getExecutor().execute(new SocketProcessor(socket));
-            } else if (state == SocketState.LONG) {
-                socket.access();
-                waitingRequests.add(socket);
+            boolean launch = false;
+               try {
+                   
+                   if (!socket.processing.compareAndSet(false, true)) {
+                       log.error("Unable to process socket. Invalid state.");
+                       return;
+                   }
+                   
+                   SocketState state = SocketState.OPEN;
+                   // Process the request from this socket
+                   if ( (!socket.isInitialized()) && 
(!setSocketOptions(socket.getSocket())) ) { 
+                       state = SocketState.CLOSED;
+                   }
+                   socket.setInitialized(true);
+
+                   if ( (state != SocketState.CLOSED) ) {
+                       state = 
(status==null)?handler.process(socket):handler.process(socket,status);
+                   }
+                   if (state == SocketState.CLOSED) {
+                       // Close socket
+                       if (log.isTraceEnabled()) {
+                           log.trace("Closing socket:"+socket);
+                       }
+                       try {
+                           socket.getSocket().close();
+                       } catch (IOException e) {
+                           // Ignore
+                       }
+                   } else if (state == SocketState.OPEN){
+                       socket.setKeptAlive(true);
+                       socket.access();
+                       //keepalive connection
+                       //TODO - servlet3 check async status, we may just be in 
a hold pattern
+                       launch = true;
+                   } else if (state == SocketState.LONG) {
+                       socket.access();
+                       waitingRequests.add(socket);
+                   }
+               } finally {
+                socket.processing.set(false);
+                if (launch) getExecutor().execute(new SocketProcessor(socket));
+                socket = null;
             }
             // Finish up this request
-            socket = null;
+            
         }
         
     }

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapper.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapper.java?rev=927576&r1=927575&r2=927576&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapper.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapper.java Thu Mar 25 
20:11:32 2010
@@ -16,6 +16,8 @@
  */
 package org.apache.tomcat.util.net;
 
+import java.util.concurrent.atomic.AtomicBoolean;
+
 
 public class SocketWrapper<E> {
     
@@ -30,19 +32,16 @@ public class SocketWrapper<E> {
     protected boolean async = false;
     protected boolean keptAlive = false;
     protected boolean initialized = false;
+    public AtomicBoolean processing = new AtomicBoolean(false);
     
     public SocketWrapper(E socket) {
-        reset(socket);
+        this.socket = socket;
     }
     
     public E getSocket() {
         return socket;
     }
     
-    public void reset(E socket) {
-        this.socket = socket;
-    }
-
     public boolean isAsync() { return async; }
     public void setAsync(boolean async) { this.async = async; }
     public long getLastAccess() { return lastAccess; }



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to