This is an automated email from the ASF dual-hosted git repository. markt pushed a commit to branch fix-bz-66508 in repository https://gitbox.apache.org/repos/asf/tomcat.git
commit f09171e654dfedcdf9657973427bd9c62b732ad0 Author: Mark Thomas <ma...@apache.org> AuthorDate: Fri Mar 24 17:21:04 2023 +0000 Further fix for BZ 66508 https://bz.apache.org/bugzilla/show_bug.cgi?id=66508 Avoid deadlock for close messages when WsRemoteEndpointImplServer.endMessage() for a previous message is processed on a container thread --- .../tomcat/websocket/WsRemoteEndpointImplBase.java | 24 +++++-- .../websocket/server/WsHttpUpgradeHandler.java | 3 +- .../server/WsRemoteEndpointImplServer.java | 76 +++++++++++++++++++++- 3 files changed, 97 insertions(+), 6 deletions(-) diff --git a/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java b/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java index eec3381a85..9091dab205 100644 --- a/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java +++ b/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java @@ -66,7 +66,7 @@ public abstract class WsRemoteEndpointImplBase implements RemoteEndpoint { private final IntermediateMessageHandler intermediateMessageHandler = new IntermediateMessageHandler(this); private Transformation transformation = null; - private final Semaphore messagePartInProgress = new Semaphore(1); + protected final Semaphore messagePartInProgress = new Semaphore(1); private final Queue<MessagePart> messagePartQueue = new ArrayDeque<>(); private final Object messagePartLock = new Object(); @@ -288,9 +288,8 @@ public abstract class WsRemoteEndpointImplBase implements RemoteEndpoint { return; } - long timeout = timeoutExpiry - System.currentTimeMillis(); try { - if (!messagePartInProgress.tryAcquire(timeout, TimeUnit.MILLISECONDS)) { + if (!acquireMessagePartInProgressSemaphore(opCode, timeoutExpiry)) { String msg = sm.getString("wsRemoteEndpoint.acquireTimeout"); wsSession.doClose(new CloseReason(CloseCodes.GOING_AWAY, msg), new CloseReason(CloseCodes.CLOSED_ABNORMALLY, msg), true); @@ -334,6 +333,23 @@ public abstract class WsRemoteEndpointImplBase implements RemoteEndpoint { } + /** + * Acquire the semaphore that allows a message part to be written. + * + * @param opCode The OPCODE for the message to be written + * @param timeoutExpiry The time when the attempt to acquire the semaphore should expire + * + * @return {@code true} if the semaphore is obtained, otherwise {@code false}. + * + * @throws InterruptedException If the wait for the semaphore is interrupted + */ + protected boolean acquireMessagePartInProgressSemaphore(byte opCode, long timeoutExpiry) + throws InterruptedException { + long timeout = timeoutExpiry - System.currentTimeMillis(); + return messagePartInProgress.tryAcquire(timeout, TimeUnit.MILLISECONDS); + } + + void startMessage(byte opCode, ByteBuffer payload, boolean last, SendHandler handler) { wsSession.updateLastActiveWrite(); @@ -392,7 +408,7 @@ public abstract class WsRemoteEndpointImplBase implements RemoteEndpoint { } - void endMessage(SendHandler handler, SendResult result) { + protected void endMessage(SendHandler handler, SendResult result) { boolean doWrite = false; MessagePart mpNext = null; synchronized (messagePartLock) { diff --git a/java/org/apache/tomcat/websocket/server/WsHttpUpgradeHandler.java b/java/org/apache/tomcat/websocket/server/WsHttpUpgradeHandler.java index b180e52a74..a71580e418 100644 --- a/java/org/apache/tomcat/websocket/server/WsHttpUpgradeHandler.java +++ b/java/org/apache/tomcat/websocket/server/WsHttpUpgradeHandler.java @@ -117,7 +117,8 @@ public class WsHttpUpgradeHandler implements InternalHttpUpgradeHandler { ClassLoader cl = t.getContextClassLoader(); t.setContextClassLoader(applicationClassLoader); try { - wsRemoteEndpointServer = new WsRemoteEndpointImplServer(socketWrapper, upgradeInfo, webSocketContainer); + wsRemoteEndpointServer = + new WsRemoteEndpointImplServer(socketWrapper, upgradeInfo, webSocketContainer, connection); wsSession = new WsSession(wsRemoteEndpointServer, webSocketContainer, handshakeRequest.getRequestURI(), handshakeRequest.getParameterMap(), handshakeRequest.getQueryString(), handshakeRequest.getUserPrincipal(), httpSessionId, negotiatedExtensions, subProtocol, diff --git a/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java b/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java index 8dd5974328..67e97a111d 100644 --- a/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java +++ b/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java @@ -24,6 +24,7 @@ import java.nio.channels.CompletionHandler; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; +import jakarta.servlet.http.WebConnection; import jakarta.websocket.SendHandler; import jakarta.websocket.SendResult; @@ -33,6 +34,7 @@ import org.apache.juli.logging.LogFactory; import org.apache.tomcat.util.net.SocketWrapperBase; import org.apache.tomcat.util.net.SocketWrapperBase.BlockingMode; import org.apache.tomcat.util.res.StringManager; +import org.apache.tomcat.websocket.Constants; import org.apache.tomcat.websocket.Transformation; import org.apache.tomcat.websocket.WsRemoteEndpointImplBase; @@ -47,16 +49,19 @@ public class WsRemoteEndpointImplServer extends WsRemoteEndpointImplBase { private final SocketWrapperBase<?> socketWrapper; private final UpgradeInfo upgradeInfo; + private final WebConnection connection; private final WsWriteTimeout wsWriteTimeout; + private final Object semaphoreManagementLock = new Object(); private volatile SendHandler handler = null; private volatile ByteBuffer[] buffers = null; private volatile long timeoutExpiry = -1; public WsRemoteEndpointImplServer(SocketWrapperBase<?> socketWrapper, UpgradeInfo upgradeInfo, - WsServerContainer serverContainer) { + WsServerContainer serverContainer, WebConnection connection) { this.socketWrapper = socketWrapper; this.upgradeInfo = upgradeInfo; + this.connection = connection; this.wsWriteTimeout = serverContainer.getTimeout(); } @@ -67,6 +72,75 @@ public class WsRemoteEndpointImplServer extends WsRemoteEndpointImplBase { } + /** + * {@inheritDoc} + * <p> + * The close message is a special case. It needs to be blocking else implementing the clean-up that follows the + * sending of the close message gets a lot more complicated. On the server, this creates additional complications + * as a dead-lock may occur in the following scenario: + * <ol> + * <li>Application thread writes message using non-blocking</li> + * <li>Write does not complete (write logic holds message pending lock)</li> + * <li>Socket is added to poller (or equivalent) for write + * <li>Client sends close message</li> + * <li>Container processes received close message and tries to send close message in response</li> + * <li>Container holds socket lock and is blocked waiting for message pending lock</li> + * <li>Poller fires write possible event for socket</li> + * <li>Container tries to process write possible event but is blocked waiting for socket lock</li> + * <li>Processing of the WebSocket connection is dead-locked until the original message write times out</li> + * </ol> + * The purpose of this method is to break the above dead-lock. It does this by returning control of the processor to + * the socket wrapper and releasing the socket lock while waiting for the pending message write to complete. + * Normally, that would be a terrible idea as it creates the possibility that the processor is returned to the pool + * more than once under various error conditions. In this instance it is safe because these are upgrade processors + * (isUpgrade() returns {@code true}) and upgrade processors are never pooled. + * <p> + * TODO: Despite the complications it creates, it would be worth exploring the possibility of processing a received + * close frame in a non-blocking manner. + */ + @Override + protected boolean acquireMessagePartInProgressSemaphore(byte opCode, long timeoutExpiry) + throws InterruptedException { + + // Only close requires special handling. + if (opCode != Constants.OPCODE_CLOSE) { + return super.acquireMessagePartInProgressSemaphore(opCode, timeoutExpiry); + } + + synchronized (semaphoreManagementLock) { + while (!messagePartInProgress.tryAcquire()) { + long timeout = timeoutExpiry - System.currentTimeMillis(); + if (timeout < 0) { + return false; + } + try { + // Release control of the processor + socketWrapper.setCurrentProcessor(connection); + // Release the per socket lock + socketWrapper.getLock().unlock(); + // Wait for another thread to call #endMessage() + semaphoreManagementLock.wait(timeout); + } finally { + // Re-obtain the per socket lock + socketWrapper.getLock().lock(); + // Re-take control of the processor + socketWrapper.takeCurrentProcessor(); + } + } + } + return true; + } + + + @Override + protected void endMessage(SendHandler handler, SendResult result) { + synchronized (semaphoreManagementLock) { + super.endMessage(handler, result); + semaphoreManagementLock.notifyAll(); + } + } + + @Override protected void doWrite(SendHandler handler, long blockingWriteTimeoutExpiry, ByteBuffer... buffers) { if (socketWrapper.hasAsyncIO()) { --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org