This is an automated email from the ASF dual-hosted git repository.

markt pushed a commit to branch fix-bz-66508
in repository https://gitbox.apache.org/repos/asf/tomcat.git

commit f09171e654dfedcdf9657973427bd9c62b732ad0
Author: Mark Thomas <ma...@apache.org>
AuthorDate: Fri Mar 24 17:21:04 2023 +0000

    Further fix for BZ 66508
    
    https://bz.apache.org/bugzilla/show_bug.cgi?id=66508
    
    Avoid deadlock for close messages when
    WsRemoteEndpointImplServer.endMessage() for a previous message is
    processed on a container thread
---
 .../tomcat/websocket/WsRemoteEndpointImplBase.java | 24 +++++--
 .../websocket/server/WsHttpUpgradeHandler.java     |  3 +-
 .../server/WsRemoteEndpointImplServer.java         | 76 +++++++++++++++++++++-
 3 files changed, 97 insertions(+), 6 deletions(-)

diff --git a/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java 
b/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java
index eec3381a85..9091dab205 100644
--- a/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java
+++ b/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java
@@ -66,7 +66,7 @@ public abstract class WsRemoteEndpointImplBase implements 
RemoteEndpoint {
     private final IntermediateMessageHandler intermediateMessageHandler = new 
IntermediateMessageHandler(this);
 
     private Transformation transformation = null;
-    private final Semaphore messagePartInProgress = new Semaphore(1);
+    protected final Semaphore messagePartInProgress = new Semaphore(1);
     private final Queue<MessagePart> messagePartQueue = new ArrayDeque<>();
     private final Object messagePartLock = new Object();
 
@@ -288,9 +288,8 @@ public abstract class WsRemoteEndpointImplBase implements 
RemoteEndpoint {
             return;
         }
 
-        long timeout = timeoutExpiry - System.currentTimeMillis();
         try {
-            if (!messagePartInProgress.tryAcquire(timeout, 
TimeUnit.MILLISECONDS)) {
+            if (!acquireMessagePartInProgressSemaphore(opCode, timeoutExpiry)) 
{
                 String msg = sm.getString("wsRemoteEndpoint.acquireTimeout");
                 wsSession.doClose(new CloseReason(CloseCodes.GOING_AWAY, msg),
                         new CloseReason(CloseCodes.CLOSED_ABNORMALLY, msg), 
true);
@@ -334,6 +333,23 @@ public abstract class WsRemoteEndpointImplBase implements 
RemoteEndpoint {
     }
 
 
+    /**
+     * Acquire the semaphore that allows a message part to be written.
+     *
+     * @param opCode The OPCODE for the message to be written
+     * @param timeoutExpiry The time when the attempt to acquire the semaphore 
should expire
+     *
+     * @return {@code true} if the semaphore is obtained, otherwise {@code 
false}.
+     *
+     * @throws InterruptedException If the wait for the semaphore is 
interrupted
+     */
+    protected boolean acquireMessagePartInProgressSemaphore(byte opCode, long 
timeoutExpiry)
+            throws InterruptedException {
+        long timeout = timeoutExpiry - System.currentTimeMillis();
+        return messagePartInProgress.tryAcquire(timeout, 
TimeUnit.MILLISECONDS);
+    }
+
+
     void startMessage(byte opCode, ByteBuffer payload, boolean last, 
SendHandler handler) {
 
         wsSession.updateLastActiveWrite();
@@ -392,7 +408,7 @@ public abstract class WsRemoteEndpointImplBase implements 
RemoteEndpoint {
     }
 
 
-    void endMessage(SendHandler handler, SendResult result) {
+    protected void endMessage(SendHandler handler, SendResult result) {
         boolean doWrite = false;
         MessagePart mpNext = null;
         synchronized (messagePartLock) {
diff --git a/java/org/apache/tomcat/websocket/server/WsHttpUpgradeHandler.java 
b/java/org/apache/tomcat/websocket/server/WsHttpUpgradeHandler.java
index b180e52a74..a71580e418 100644
--- a/java/org/apache/tomcat/websocket/server/WsHttpUpgradeHandler.java
+++ b/java/org/apache/tomcat/websocket/server/WsHttpUpgradeHandler.java
@@ -117,7 +117,8 @@ public class WsHttpUpgradeHandler implements 
InternalHttpUpgradeHandler {
         ClassLoader cl = t.getContextClassLoader();
         t.setContextClassLoader(applicationClassLoader);
         try {
-            wsRemoteEndpointServer = new 
WsRemoteEndpointImplServer(socketWrapper, upgradeInfo, webSocketContainer);
+            wsRemoteEndpointServer =
+                    new WsRemoteEndpointImplServer(socketWrapper, upgradeInfo, 
webSocketContainer, connection);
             wsSession = new WsSession(wsRemoteEndpointServer, 
webSocketContainer, handshakeRequest.getRequestURI(),
                     handshakeRequest.getParameterMap(), 
handshakeRequest.getQueryString(),
                     handshakeRequest.getUserPrincipal(), httpSessionId, 
negotiatedExtensions, subProtocol,
diff --git 
a/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java 
b/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java
index 8dd5974328..67e97a111d 100644
--- a/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java
+++ b/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java
@@ -24,6 +24,7 @@ import java.nio.channels.CompletionHandler;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 
+import jakarta.servlet.http.WebConnection;
 import jakarta.websocket.SendHandler;
 import jakarta.websocket.SendResult;
 
@@ -33,6 +34,7 @@ import org.apache.juli.logging.LogFactory;
 import org.apache.tomcat.util.net.SocketWrapperBase;
 import org.apache.tomcat.util.net.SocketWrapperBase.BlockingMode;
 import org.apache.tomcat.util.res.StringManager;
+import org.apache.tomcat.websocket.Constants;
 import org.apache.tomcat.websocket.Transformation;
 import org.apache.tomcat.websocket.WsRemoteEndpointImplBase;
 
@@ -47,16 +49,19 @@ public class WsRemoteEndpointImplServer extends 
WsRemoteEndpointImplBase {
 
     private final SocketWrapperBase<?> socketWrapper;
     private final UpgradeInfo upgradeInfo;
+    private final WebConnection connection;
     private final WsWriteTimeout wsWriteTimeout;
+    private final Object semaphoreManagementLock = new Object();
     private volatile SendHandler handler = null;
     private volatile ByteBuffer[] buffers = null;
 
     private volatile long timeoutExpiry = -1;
 
     public WsRemoteEndpointImplServer(SocketWrapperBase<?> socketWrapper, 
UpgradeInfo upgradeInfo,
-            WsServerContainer serverContainer) {
+            WsServerContainer serverContainer, WebConnection connection) {
         this.socketWrapper = socketWrapper;
         this.upgradeInfo = upgradeInfo;
+        this.connection = connection;
         this.wsWriteTimeout = serverContainer.getTimeout();
     }
 
@@ -67,6 +72,75 @@ public class WsRemoteEndpointImplServer extends 
WsRemoteEndpointImplBase {
     }
 
 
+    /**
+     * {@inheritDoc}
+     * <p>
+     * The close message is a special case. It needs to be blocking else 
implementing the clean-up that follows the
+     * sending of the close message gets a lot more complicated. On the 
server, this creates additional complications
+     * as a dead-lock may occur in the following scenario:
+     * <ol>
+     *   <li>Application thread writes message using non-blocking</li>
+     *   <li>Write does not complete (write logic holds message pending 
lock)</li>
+     *   <li>Socket is added to poller (or equivalent) for write
+     *   <li>Client sends close message</li>
+     *   <li>Container processes received close message and tries to send 
close message in response</li>
+     *   <li>Container holds socket lock and is blocked waiting for message 
pending lock</li>
+     *   <li>Poller fires write possible event for socket</li>
+     *   <li>Container tries to process write possible event but is blocked 
waiting for socket lock</li>
+     *   <li>Processing of the WebSocket connection is dead-locked until the 
original message write times out</li>
+     * </ol>
+     * The purpose of this method is to break the above dead-lock. It does 
this by returning control of the processor to
+     * the socket wrapper and releasing the socket lock while waiting for the 
pending message write to complete.
+     * Normally, that would be a terrible idea as it creates the possibility 
that the processor is returned to the pool
+     * more than once under various error conditions. In this instance it is 
safe because these are upgrade processors
+     * (isUpgrade() returns {@code true}) and upgrade processors are never 
pooled.
+     * <p>
+     * TODO: Despite the complications it creates, it would be worth exploring 
the possibility of processing a received
+     * close frame in a non-blocking manner.
+     */
+    @Override
+    protected boolean acquireMessagePartInProgressSemaphore(byte opCode, long 
timeoutExpiry)
+            throws InterruptedException {
+
+        // Only close requires special handling.
+        if (opCode != Constants.OPCODE_CLOSE) {
+            return super.acquireMessagePartInProgressSemaphore(opCode, 
timeoutExpiry);
+        }
+
+        synchronized (semaphoreManagementLock) {
+            while (!messagePartInProgress.tryAcquire()) {
+                long timeout = timeoutExpiry - System.currentTimeMillis();
+                if (timeout < 0) {
+                    return false;
+                }
+                try {
+                    // Release control of the processor
+                    socketWrapper.setCurrentProcessor(connection);
+                    // Release the per socket lock
+                    socketWrapper.getLock().unlock();
+                    // Wait for another thread to call #endMessage()
+                    semaphoreManagementLock.wait(timeout);
+                } finally {
+                    // Re-obtain the per socket lock
+                    socketWrapper.getLock().lock();
+                    // Re-take control of the processor
+                    socketWrapper.takeCurrentProcessor();
+                }
+            }
+        }
+        return true;
+    }
+
+
+    @Override
+    protected void endMessage(SendHandler handler, SendResult result) {
+        synchronized (semaphoreManagementLock) {
+            super.endMessage(handler, result);
+            semaphoreManagementLock.notifyAll();
+        }
+    }
+
+
     @Override
     protected void doWrite(SendHandler handler, long 
blockingWriteTimeoutExpiry, ByteBuffer... buffers) {
         if (socketWrapper.hasAsyncIO()) {


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to