Author: markt
Date: Tue Mar 28 14:47:56 2017
New Revision: 1789155

URL: http://svn.apache.org/viewvc?rev=1789155&view=rev
Log:
Follow-up to r1789024
Need to add socket/processor to connection cache when sendfile is in progress 
so pipe-lined request is processed
Add syncs to AprEndpoint so that only one thread can process a soocket at a 
time else the transfer between sendfile and normal processing can result in 
concurrent socket usage.

Modified:
    tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11AprProcessor.java
    tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11AprProtocol.java
    tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProtocol.java
    tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
    tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java

Modified: 
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11AprProcessor.java
URL: 
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11AprProcessor.java?rev=1789155&r1=1789154&r2=1789155&view=diff
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11AprProcessor.java 
(original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11AprProcessor.java 
Tue Mar 28 14:47:56 2017
@@ -812,6 +812,7 @@ public class Http11AprProcessor implemen
 
         boolean keptAlive = false;
         boolean openSocket = false;
+        boolean sendfileInProgress = false;
 
         while (!error && keepAlive && !comet) {
 
@@ -941,7 +942,7 @@ public class Http11AprProcessor implemen
                         }
                         error = true;
                     } else {
-                        openSocket = true;
+                        sendfileInProgress = true;
                     }
                     break;
                 }
@@ -962,6 +963,8 @@ public class Http11AprProcessor implemen
             } else {
                 return SocketState.LONG;
             }
+        } else if (sendfileInProgress) {
+            return SocketState.SENDFILE;
         } else {
             recycle();
             return (openSocket) ? SocketState.OPEN : SocketState.CLOSED;

Modified: 
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11AprProtocol.java
URL: 
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11AprProtocol.java?rev=1789155&r1=1789154&r2=1789155&view=diff
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11AprProtocol.java 
(original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11AprProtocol.java 
Tue Mar 28 14:47:56 2017
@@ -601,8 +601,14 @@ public class Http11AprProtocol extends A
         }
 
         public SocketState process(long socket) {
-            Http11AprProcessor processor = recycledProcessors.poll();
+            Http11AprProcessor processor = null;
             try {
+                processor = connections.remove(Long.valueOf(socket));
+
+                if (processor == null) {
+                    processor = recycledProcessors.poll();
+                }
+
                 if (processor == null) {
                     processor = createProcessor();
                 }
@@ -616,6 +622,17 @@ public class Http11AprProtocol extends A
                     // processor.
                     connections.put(Long.valueOf(socket), processor);
                     proto.endpoint.getCometPoller().add(socket);
+                } else if (state == SocketState.OPEN) {
+                    // In keep-alive but between requests. OK to recycle
+                    // processor. Continue to poll for the next request.
+                    recycledProcessors.offer(processor);
+                    proto.endpoint.getPoller().add(socket);
+                } else if (state == SocketState.SENDFILE) {
+                    // Sendfile in progress. If it fails, the socket will be
+                    // closed. If it works, the socket either be added to the
+                    // poller to await more data or processed if there are any
+                    // pipe-lined requests remaining.
+                    connections.put(socket, processor);
                 } else {
                     recycledProcessors.offer(processor);
                 }

Modified: 
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProtocol.java
URL: 
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProtocol.java?rev=1789155&r1=1789154&r2=1789155&view=diff
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProtocol.java 
(original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProtocol.java 
Tue Mar 28 14:47:56 2017
@@ -769,6 +769,7 @@ public class Http11NioProtocol extends A
                     // closed. If it works, the socket either be added to the
                     // poller to await more data or processed if there are any
                     // pipe-lined requests remaining.
+                    connections.put(socket, processor);
                 } else {
                     // Connection closed. OK to recycle the processor.
                     release(socket, processor);

Modified: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
URL: 
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java?rev=1789155&r1=1789154&r2=1789155&view=diff
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java 
(original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java Tue 
Mar 28 14:47:56 2017
@@ -22,6 +22,8 @@ import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 
 import org.apache.juli.logging.Log;
@@ -161,6 +163,8 @@ public class AprEndpoint extends Abstrac
     /* Acceptor thread array */
     private Acceptor acceptors[] = null;
 
+    private Map<Long,Object> locks = new ConcurrentHashMap<Long, Object>();
+
     // ------------------------------------------------------------- Properties
 
 
@@ -1244,6 +1248,8 @@ public class AprEndpoint extends Abstrac
      * Process given socket.
      */
     protected boolean processSocketWithOptions(long socket) {
+        Long key = Long.valueOf(socket);
+        locks.put(key, new Object());
         try {
             if (executor == null) {
                 getWorkerThread().assignWithOptions(socket);
@@ -1254,6 +1260,7 @@ public class AprEndpoint extends Abstrac
             // This means we got an OOM or similar creating a thread, or that
             // the pool and its queue are full
             log.error(sm.getString("endpoint.process.fail"), t);
+            locks.remove(key);
             return false;
         }
         return true;
@@ -1300,6 +1307,8 @@ public class AprEndpoint extends Abstrac
     }
 
     private void destroySocket(long socket) {
+        Long key = Long.valueOf(socket);
+        locks.remove(key);
         if (running && socket != 0) {
             // If not running the socket will be destroyed by
             // parent pool or acceptor socket.
@@ -1769,16 +1778,21 @@ public class AprEndpoint extends Abstrac
                     }
                 } else {
 
-                    // Process the request from this socket
-                    if ((status != null) && (handler.event(socket, status) == 
Handler.SocketState.CLOSED)) {
-                        // Close socket and pool
-                        destroySocket(socket);
-                        socket = 0;
-                    } else if ((status == null) && ((options && 
!setSocketOptions(socket))
-                            || handler.process(socket) == 
Handler.SocketState.CLOSED)) {
-                        // Close socket and pool
-                        destroySocket(socket);
-                        socket = 0;
+                    Long key = Long.valueOf(socket);
+                    Object lock = locks.get(key);
+
+                    synchronized (lock) {
+                        // Process the request from this socket
+                        if ((status != null) && (handler.event(socket, status) 
== Handler.SocketState.CLOSED)) {
+                            // Close socket and pool
+                            destroySocket(socket);
+                            socket = 0;
+                        } else if ((status == null) && ((options && 
!setSocketOptions(socket))
+                                || handler.process(socket) == 
Handler.SocketState.CLOSED)) {
+                            // Close socket and pool
+                            destroySocket(socket);
+                            socket = 0;
+                        }
                     }
                 }
 
@@ -2082,7 +2096,7 @@ public class AprEndpoint extends Abstrac
                                     Pool.destroy(state.fdpool);
                                     Socket.timeoutSet(state.socket, soTimeout 
* 1000);
                                     // Process the pipelined request data
-                                    if (!processSocket(state.socket, 
SocketStatus.OPEN)) {
+                                    if (!processSocket(state.socket, null)) {
                                         destroySocket(state.socket);
                                     }
                                     break;
@@ -2155,7 +2169,7 @@ public class AprEndpoint extends Abstrac
      */
     public interface Handler {
         public enum SocketState {
-            OPEN, CLOSED, LONG
+            OPEN, CLOSED, LONG, SENDFILE
         }
         public SocketState process(long socket);
         public SocketState event(long socket, SocketStatus status);
@@ -2268,16 +2282,19 @@ public class AprEndpoint extends Abstrac
                 }
             } else {
                 // Process the request from this socket
-                if (!setSocketOptions(socket)
-                        || handler.process(socket) == 
Handler.SocketState.CLOSED) {
-                    // Close socket and pool
-                    destroySocket(socket);
-                    socket = 0;
+                Long key = Long.valueOf(socket);
+                Object lock = locks.get(key);
+
+                synchronized (lock) {
+                    if (!setSocketOptions(socket)
+                            || handler.process(socket) == 
Handler.SocketState.CLOSED) {
+                        // Close socket and pool
+                        destroySocket(socket);
+                        socket = 0;
+                    }
                 }
             }
-
         }
-
     }
 
 
@@ -2298,13 +2315,17 @@ public class AprEndpoint extends Abstrac
 
         public void run() {
 
-            // Process the request from this socket
-            if (handler.process(socket) == Handler.SocketState.CLOSED) {
-                // Close socket and pool
-                destroySocket(socket);
-                socket = 0;
-            }
+            Long key = Long.valueOf(socket);
+            Object lock = locks.get(key);
 
+            synchronized (lock) {
+                // Process the request from this socket
+                if (handler.process(socket) == Handler.SocketState.CLOSED) {
+                    // Close socket and pool
+                    destroySocket(socket);
+                    socket = 0;
+                }
+            }
         }
 
     }
@@ -2329,16 +2350,17 @@ public class AprEndpoint extends Abstrac
 
         public void run() {
 
-            // Process the request from this socket
-            if (handler.event(socket, status) == Handler.SocketState.CLOSED) {
-                // Close socket and pool
-                destroySocket(socket);
-                socket = 0;
-            }
+            Long key = Long.valueOf(socket);
+            Object lock = locks.get(key);
 
+            synchronized (lock) {
+                // Process the request from this socket
+                if (handler.event(socket, status) == 
Handler.SocketState.CLOSED) {
+                    // Close socket and pool
+                    destroySocket(socket);
+                    socket = 0;
+                }
+            }
         }
-
     }
-
-
 }

Modified: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
URL: 
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=1789155&r1=1789154&r2=1789155&view=diff
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java 
(original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Tue 
Mar 28 14:47:56 2017
@@ -1854,7 +1854,7 @@ public class NioEndpoint extends Abstrac
                             if (log.isDebugEnabled()) {
                                 log.debug("Connection is keep alive, 
processing pipe-lined data");
                             }
-                            if (!processSocket(sc, SocketStatus.OPEN, true)) {
+                            if (!processSocket(sc, null, true)) {
                                 cancelledKey(sk, SocketStatus.DISCONNECT, 
false);
                             }
                             break;



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to