This is an automated email from the ASF dual-hosted git repository. markt pushed a commit to branch 8.5.x in repository https://gitbox.apache.org/repos/asf/tomcat.git
The following commit(s) were added to refs/heads/8.5.x by this push: new 6759f9e92b Fix BZ 66574 - refactor close to avoid possible deadlock 6759f9e92b is described below commit 6759f9e92b112056ab19e6cfc2291ae3137eb1b6 Author: Mark Thomas <ma...@apache.org> AuthorDate: Tue Apr 25 21:09:30 2023 +0100 Fix BZ 66574 - refactor close to avoid possible deadlock https://bz.apache.org/bugzilla/show_bug.cgi?id=66574 --- java/org/apache/tomcat/websocket/WsSession.java | 122 +++++++++++++----------- webapps/docs/changelog.xml | 5 + 2 files changed, 69 insertions(+), 58 deletions(-) diff --git a/java/org/apache/tomcat/websocket/WsSession.java b/java/org/apache/tomcat/websocket/WsSession.java index 890f4f9d8a..18d7704f39 100644 --- a/java/org/apache/tomcat/websocket/WsSession.java +++ b/java/org/apache/tomcat/websocket/WsSession.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import javax.naming.NamingException; import javax.websocket.ClientEndpointConfig; @@ -105,7 +106,7 @@ public class WsSession implements Session { // Expected to handle message types of <ByteBuffer> only private volatile MessageHandler binaryMessageHandler = null; private volatile MessageHandler.Whole<PongMessage> pongMessageHandler = null; - private volatile State state = State.OPEN; + private AtomicReference<State> state = new AtomicReference<>(State.OPEN); private final Map<String, Object> userProperties = new ConcurrentHashMap<>(); private volatile int maxBinaryMessageBufferSize = Constants.DEFAULT_BUFFER_SIZE; private volatile int maxTextMessageBufferSize = Constants.DEFAULT_BUFFER_SIZE; @@ -541,12 +542,12 @@ public class WsSession implements Session { @Override public boolean isOpen() { - return state == State.OPEN; + return state.get() == State.OPEN; } public boolean isClosed() { - return state == State.CLOSED; + return state.get() == State.CLOSED; } @@ -646,46 +647,38 @@ public class WsSession implements Session { * @param closeSocket Should the socket be closed immediately rather than waiting for the server to respond */ public void doClose(CloseReason closeReasonMessage, CloseReason closeReasonLocal, boolean closeSocket) { - // Double-checked locking. OK because state is volatile - if (state != State.OPEN) { + + if (!state.compareAndSet(State.OPEN, State.OUTPUT_CLOSING)) { + // Close process has already been started. Don't start it again. return; } - wsRemoteEndpoint.getLock().lock(); - try { - if (state != State.OPEN) { - return; - } - - if (log.isDebugEnabled()) { - log.debug(sm.getString("wsSession.doClose", id)); - } + if (log.isDebugEnabled()) { + log.debug(sm.getString("wsSession.doClose", id)); + } - // This will trigger a flush of any batched messages. - try { - wsRemoteEndpoint.setBatchingAllowed(false); - } catch (IOException e) { - log.warn(sm.getString("wsSession.flushFailOnClose"), e); - fireEndpointOnError(e); - } + // Flush any batched messages not yet sent. + try { + wsRemoteEndpoint.setBatchingAllowed(false); + } catch (IOException e) { + log.warn(sm.getString("wsSession.flushFailOnClose"), e); + fireEndpointOnError(e); + } + // Send the close message to the remote endpoint. + sendCloseMessage(closeReasonMessage); + fireEndpointOnClose(closeReasonLocal); + if (!state.compareAndSet(State.OUTPUT_CLOSING, State.OUTPUT_CLOSED) || closeSocket) { /* - * If the flush above fails the error handling could call this method recursively. Without this check, the - * close message and notifications could be sent multiple times. + * A close message was received in another thread or this is handling an error condition. Either way, no + * further close message is expected to be received. Mark the session as fully closed... */ - if (state != State.OUTPUT_CLOSED) { - state = State.OUTPUT_CLOSED; - - sendCloseMessage(closeReasonMessage); - if (closeSocket) { - wsRemoteEndpoint.close(); - } - fireEndpointOnClose(closeReasonLocal); - } - } finally { - wsRemoteEndpoint.getLock().unlock(); + state.set(State.CLOSED); + // ... and close the network connection. + wsRemoteEndpoint.close(); } + // Fail any uncompleted messages. IOException ioe = new IOException(sm.getString("wsSession.messageFailed")); SendResult sr = new SendResult(ioe); for (FutureToSendHandler f2sh : futures.keySet()) { @@ -701,29 +694,40 @@ public class WsSession implements Session { * @param closeReason The reason contained within the received close message. */ public void onClose(CloseReason closeReason) { + if (state.compareAndSet(State.OPEN, State.CLOSING)) { + // Standard close. - wsRemoteEndpoint.getLock().lock(); - try { - if (state != State.CLOSED) { - try { - wsRemoteEndpoint.setBatchingAllowed(false); - } catch (IOException e) { - log.warn(sm.getString("wsSession.flushFailOnClose"), e); - fireEndpointOnError(e); - } - if (state == State.OPEN) { - state = State.OUTPUT_CLOSED; - sendCloseMessage(closeReason); - fireEndpointOnClose(closeReason); - } - state = State.CLOSED; - - // Close the socket - wsRemoteEndpoint.close(); + // Flush any batched messages not yet sent. + try { + wsRemoteEndpoint.setBatchingAllowed(false); + } catch (IOException e) { + log.warn(sm.getString("wsSession.flushFailOnClose"), e); + fireEndpointOnError(e); } - } finally { - wsRemoteEndpoint.getLock().unlock(); + + // Send the close message response to the remote endpoint. + sendCloseMessage(closeReason); + fireEndpointOnClose(closeReason); + + // Mark the session as fully closed. + state.set(State.CLOSED); + + // Close the network connection. + wsRemoteEndpoint.close(); + } else if (state.compareAndSet(State.OUTPUT_CLOSING, State.CLOSING)) { + /* + * The local endpoint sent a close message the the same time as the remote endpoint. The local close is + * still being processed. Update the state so the the local close process will also close the network + * connection once it has finished sending a close message. + */ + } else if (state.compareAndSet(State.OUTPUT_CLOSED, State.CLOSED)) { + /* + * The local endpoint sent the first close message. The remote endpoint has now responded with its own close + * message so mark the session as fully closed and close the network connection. + */ + wsRemoteEndpoint.close(); } + // CLOSING and CLOSED are NO-OPs } @@ -871,13 +875,13 @@ public class WsSession implements Session { // Always register the future. futures.put(f2sh, f2sh); - if (state == State.OPEN) { + if (isOpen()) { // The session is open. The future has been registered with the open // session. Normal processing continues. return; } - // The session is closed. The future may or may not have been registered + // The session is closing / closed. The future may or may not have been registered // in time for it to be processed during session closure. if (f2sh.isDone()) { @@ -887,7 +891,7 @@ public class WsSession implements Session { return; } - // The session is closed. The Future had not completed when last checked. + // The session is closing / closed. The Future had not completed when last checked. // There is a small timing window that means the Future may have been // completed since the last check. There is also the possibility that // the Future was not registered in time to be cleaned up during session @@ -1047,7 +1051,7 @@ public class WsSession implements Session { private void checkState() { - if (state == State.CLOSED) { + if (isClosed()) { /* * As per RFC 6455, a WebSocket connection is considered to be closed once a peer has sent and received a * WebSocket close frame. @@ -1058,7 +1062,9 @@ public class WsSession implements Session { private enum State { OPEN, + OUTPUT_CLOSING, OUTPUT_CLOSED, + CLOSING, CLOSED } diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml index 56a28fbad0..f0a1bd06e0 100644 --- a/webapps/docs/changelog.xml +++ b/webapps/docs/changelog.xml @@ -123,6 +123,11 @@ </subsection> <subsection name="WebSocket"> <changelog> + <fix> + <bug>66574</bug>: Refactor WebSocket session close to remove the lock on + the <code>SocketWrapper</code> which was a potential cause of deadlocks + if the application code used simulated blocking. (markt) + </fix> <fix> <bug>66575</bug>: Avoid unchecked use of the backing array of a buffer provided by the user in the compression transformation. (remm) --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org