This is an automated email from the ASF dual-hosted git repository. markt pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/tomcat.git
The following commit(s) were added to refs/heads/main by this push: new 36f134465d Fix BZ 66574 - refactor close to avoid possible deadlock 36f134465d is described below commit 36f134465dea9e2d7edf23e1b0cf5d0f571032d3 Author: Mark Thomas <ma...@apache.org> AuthorDate: Tue Apr 25 21:09:30 2023 +0100 Fix BZ 66574 - refactor close to avoid possible deadlock https://bz.apache.org/bugzilla/show_bug.cgi?id=66574 --- java/org/apache/tomcat/websocket/WsSession.java | 124 ++++++++++++------------ webapps/docs/changelog.xml | 5 + 2 files changed, 69 insertions(+), 60 deletions(-) diff --git a/java/org/apache/tomcat/websocket/WsSession.java b/java/org/apache/tomcat/websocket/WsSession.java index 8aad5a70ff..b8fe3b1db4 100644 --- a/java/org/apache/tomcat/websocket/WsSession.java +++ b/java/org/apache/tomcat/websocket/WsSession.java @@ -28,7 +28,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; +import java.util.concurrent.atomic.AtomicReference; import javax.naming.NamingException; @@ -107,7 +107,7 @@ public class WsSession implements Session { // Expected to handle message types of <ByteBuffer> only private volatile MessageHandler binaryMessageHandler = null; private volatile MessageHandler.Whole<PongMessage> pongMessageHandler = null; - private volatile State state = State.OPEN; + private AtomicReference<State> state = new AtomicReference<>(State.OPEN); private final Map<String, Object> userProperties = new ConcurrentHashMap<>(); private volatile int maxBinaryMessageBufferSize = Constants.DEFAULT_BUFFER_SIZE; private volatile int maxTextMessageBufferSize = Constants.DEFAULT_BUFFER_SIZE; @@ -459,12 +459,12 @@ public class WsSession implements Session { @Override public boolean isOpen() { - return state == State.OPEN; + return state.get() == State.OPEN; } public boolean isClosed() { - return state == State.CLOSED; + return state.get() == State.CLOSED; } @@ -564,46 +564,38 @@ public class WsSession implements Session { * @param closeSocket Should the socket be closed immediately rather than waiting for the server to respond */ public void doClose(CloseReason closeReasonMessage, CloseReason closeReasonLocal, boolean closeSocket) { - // Double-checked locking. OK because state is volatile - if (state != State.OPEN) { + + if (!state.compareAndSet(State.OPEN, State.OUTPUT_CLOSING)) { + // Close process has already been started. Don't start it again. return; } - wsRemoteEndpoint.getLock().lock(); - try { - if (state != State.OPEN) { - return; - } - - if (log.isDebugEnabled()) { - log.debug(sm.getString("wsSession.doClose", id)); - } + if (log.isDebugEnabled()) { + log.debug(sm.getString("wsSession.doClose", id)); + } - // This will trigger a flush of any batched messages. - try { - wsRemoteEndpoint.setBatchingAllowed(false); - } catch (IOException e) { - log.warn(sm.getString("wsSession.flushFailOnClose"), e); - fireEndpointOnError(e); - } + // Flush any batched messages not yet sent. + try { + wsRemoteEndpoint.setBatchingAllowed(false); + } catch (IOException e) { + log.warn(sm.getString("wsSession.flushFailOnClose"), e); + fireEndpointOnError(e); + } + // Send the close message to the remote endpoint. + sendCloseMessage(closeReasonMessage); + fireEndpointOnClose(closeReasonLocal); + if (!state.compareAndSet(State.OUTPUT_CLOSING, State.OUTPUT_CLOSED) || closeSocket) { /* - * If the flush above fails the error handling could call this method recursively. Without this check, the - * close message and notifications could be sent multiple times. + * A close message was received in another thread or this is handling an error condition. Either way, no + * further close message is expected to be received. Mark the session as fully closed... */ - if (state != State.OUTPUT_CLOSED) { - state = State.OUTPUT_CLOSED; - - sendCloseMessage(closeReasonMessage); - if (closeSocket) { - wsRemoteEndpoint.close(); - } - fireEndpointOnClose(closeReasonLocal); - } - } finally { - wsRemoteEndpoint.getLock().unlock(); + state.set(State.CLOSED); + // ... and close the network connection. + wsRemoteEndpoint.close(); } + // Fail any uncompleted messages. IOException ioe = new IOException(sm.getString("wsSession.messageFailed")); SendResult sr = new SendResult(ioe); for (FutureToSendHandler f2sh : futures.keySet()) { @@ -619,30 +611,40 @@ public class WsSession implements Session { * @param closeReason The reason contained within the received close message. */ public void onClose(CloseReason closeReason) { + if (state.compareAndSet(State.OPEN, State.CLOSING)) { + // Standard close. - Lock lock = wsRemoteEndpoint.getLock(); - lock.lock(); - try { - if (state != State.CLOSED) { - try { - wsRemoteEndpoint.setBatchingAllowed(false); - } catch (IOException e) { - log.warn(sm.getString("wsSession.flushFailOnClose"), e); - fireEndpointOnError(e); - } - if (state == State.OPEN) { - state = State.OUTPUT_CLOSED; - sendCloseMessage(closeReason); - fireEndpointOnClose(closeReason); - } - state = State.CLOSED; - - // Close the socket - wsRemoteEndpoint.close(); + // Flush any batched messages not yet sent. + try { + wsRemoteEndpoint.setBatchingAllowed(false); + } catch (IOException e) { + log.warn(sm.getString("wsSession.flushFailOnClose"), e); + fireEndpointOnError(e); } - } finally { - lock.unlock(); + + // Send the close message response to the remote endpoint. + sendCloseMessage(closeReason); + fireEndpointOnClose(closeReason); + + // Mark the session as fully closed. + state.set(State.CLOSED); + + // Close the network connection. + wsRemoteEndpoint.close(); + } else if (state.compareAndSet(State.OUTPUT_CLOSING, State.CLOSING)) { + /* + * The local endpoint sent a close message the the same time as the remote endpoint. The local close is + * still being processed. Update the state so the the local close process will also close the network + * connection once it has finished sending a close message. + */ + } else if (state.compareAndSet(State.OUTPUT_CLOSED, State.CLOSED)) { + /* + * The local endpoint sent the first close message. The remote endpoint has now responded with its own close + * message so mark the session as fully closed and close the network connection. + */ + wsRemoteEndpoint.close(); } + // CLOSING and CLOSED are NO-OPs } @@ -790,13 +792,13 @@ public class WsSession implements Session { // Always register the future. futures.put(f2sh, f2sh); - if (state == State.OPEN) { + if (isOpen()) { // The session is open. The future has been registered with the open // session. Normal processing continues. return; } - // The session is closed. The future may or may not have been registered + // The session is closing / closed. The future may or may not have been registered // in time for it to be processed during session closure. if (f2sh.isDone()) { @@ -806,7 +808,7 @@ public class WsSession implements Session { return; } - // The session is closed. The Future had not completed when last checked. + // The session is closing / closed. The Future had not completed when last checked. // There is a small timing window that means the Future may have been // completed since the last check. There is also the possibility that // the Future was not registered in time to be cleaned up during session @@ -966,7 +968,7 @@ public class WsSession implements Session { private void checkState() { - if (state == State.CLOSED) { + if (isClosed()) { /* * As per RFC 6455, a WebSocket connection is considered to be closed once a peer has sent and received a * WebSocket close frame. @@ -977,7 +979,9 @@ public class WsSession implements Session { private enum State { OPEN, + OUTPUT_CLOSING, OUTPUT_CLOSED, + CLOSING, CLOSED } diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml index 33ef668c09..acf99d61c0 100644 --- a/webapps/docs/changelog.xml +++ b/webapps/docs/changelog.xml @@ -123,6 +123,11 @@ </subsection> <subsection name="WebSocket"> <changelog> + <fix> + <bug>66574</bug>: Refactor WebSocket session close to remove the lock on + the <code>SocketWrapper</code> which was a potential cause of deadlocks + if the application code used simulated blocking. (markt) + </fix> <fix> <bug>66575</bug>: Avoid unchecked use of the backing array of a buffer provided by the user in the compression transformation. (remm) --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org