This is an automated email from the ASF dual-hosted git repository. markt pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/tomcat.git
commit 150171f0a8e06b4ce8a2beb618bf85a5bb5240d3 Author: Mark Thomas <ma...@apache.org> AuthorDate: Tue Apr 4 20:11:25 2023 +0100 Add a test case for BZ 66508 https://bz.apache.org/bugzilla/show_bug.cgi?id=66508 --- .../TestWsRemoteEndpointImplServerDeadlock.java | 219 +++++++++++++++++++++ 1 file changed, 219 insertions(+) diff --git a/test/org/apache/tomcat/websocket/server/TestWsRemoteEndpointImplServerDeadlock.java b/test/org/apache/tomcat/websocket/server/TestWsRemoteEndpointImplServerDeadlock.java new file mode 100644 index 0000000000..269f112849 --- /dev/null +++ b/test/org/apache/tomcat/websocket/server/TestWsRemoteEndpointImplServerDeadlock.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tomcat.websocket.server; + +import java.lang.reflect.Field; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import jakarta.websocket.ClientEndpoint; +import jakarta.websocket.ContainerProvider; +import jakarta.websocket.OnError; +import jakarta.websocket.OnMessage; +import jakarta.websocket.OnOpen; +import jakarta.websocket.Session; +import jakarta.websocket.WebSocketContainer; +import jakarta.websocket.server.ServerEndpointConfig; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; + +import org.apache.catalina.Context; +import org.apache.catalina.servlets.DefaultServlet; +import org.apache.catalina.startup.Tomcat; +import org.apache.tomcat.websocket.WebSocketBaseTest; +import org.apache.tomcat.websocket.WsSession; + +/* + * https://bz.apache.org/bugzilla/show_bug.cgi?id=66508 + * + * If the client sends a close while the server waiting for the client before sending the rest of a message, the + * processing of the close from the client can hang until the sending of the message times out. + * + * This is packaged in a separate class to allow test specific parameterisation. + */ +@RunWith(Parameterized.class) +public class TestWsRemoteEndpointImplServerDeadlock extends WebSocketBaseTest { + + @Parameterized.Parameters(name = "{index}: useAsyncIO[{0}], sendOnContainerThread[{1}]") + public static Collection<Object[]> parameters() { + + List<Object[]> parameterSets = new ArrayList<>(); + + for (Boolean useAsyncIO : booleans) { + for (Boolean sendOnContainerThread : booleans) { + parameterSets.add(new Object[] { useAsyncIO, sendOnContainerThread }); + } + } + + return parameterSets; + } + + @Parameter(0) + public Boolean useAsyncIO; + + @Parameter(1) + public Boolean sendOnContainerThread; + + /* + * Statics used to pass state to instances that are configured and created by class name so there is no easy way to + * configure the created instances directly. + * + * Every component that uses these statics takes a local copy ASAP to avoid issues with previous test runs retaining + * references to the instance stored in the static and interfering with the current test run. + */ + private static volatile boolean initialSendOnContainerThread; + private static volatile CountDownLatch initialServerSendLatch; + private static volatile CountDownLatch initialClientReceiveLatch; + + @Test + public void testTemporaryDeadlockOnClientClose() throws Exception { + // Configure the statics + initialSendOnContainerThread = sendOnContainerThread.booleanValue(); + initialServerSendLatch = new CountDownLatch(1); + initialClientReceiveLatch = new CountDownLatch(1); + + // Local copies of the statics used in this method + CountDownLatch serverSendLatch = initialServerSendLatch; + CountDownLatch clientReceiveLatch = initialClientReceiveLatch; + + Tomcat tomcat = getTomcatInstance(); + Assert.assertTrue(tomcat.getConnector().setProperty("useAsyncIO", useAsyncIO.toString())); + + // No file system docBase required + Context ctx = tomcat.addContext("", null); + ctx.addApplicationListener(Bug66508Config.class.getName()); + Tomcat.addServlet(ctx, "default", new DefaultServlet()); + ctx.addServletMappingDecoded("/", "default"); + + WebSocketContainer wsContainer = ContainerProvider.getWebSocketContainer(); + + tomcat.start(); + + Bug66508Client client = new Bug66508Client(); + URI uri = new URI("ws://localhost:" + getPort() + Bug66508Config.PATH); + + Session session = wsContainer.connectToServer(client, uri); + // Server starts to send messages. + // Wait for server sending to block. + serverSendLatch.await(); + // Server buffers are full. Server cannot send any more messages. + // Server is now blocked waiting for the client to read the messages. + + // Close the session from the client + session.close(); + + // Wait for server to complete sending the close message + // This is the process that deadlocks when the bug is experienced + Field f = WsSession.class.getDeclaredField("state"); + f.setAccessible(true); + Object state = f.get(Bug66508Endpoint.serverSession); + int count = 0; + long start = System.nanoTime(); + while (!"CLOSED".equals(state.toString()) && count < 100) { + count++; + Thread.sleep(100); + state = f.get(Bug66508Endpoint.serverSession); + if (count == 10) { + // If deadlock is present, this should be long enough to trigger it. + // Release the client latch so it starts processing messages again else the server will never be able to + // send the close message. + clientReceiveLatch.countDown(); + } + } + long closeDelay = System.nanoTime() - start; + + Assert.assertTrue("Close delay was [" + closeDelay + "] ns", closeDelay < 10_000_000_000L); + + } + + public static class Bug66508Config extends TesterEndpointConfig { + + public static final String PATH = "/bug66508"; + + + @Override + protected ServerEndpointConfig getServerEndpointConfig() { + return ServerEndpointConfig.Builder.create(Bug66508Endpoint.class, PATH).build(); + } + } + + public static class Bug66508Endpoint { + + // 1024k message + private static final String MSG = "a".repeat(1024 * 8); + + private static volatile Session serverSession = null; + private CountDownLatch serverSendLatch = initialServerSendLatch; + private boolean sendOnContainerThread = initialSendOnContainerThread; + + @OnOpen + public void onOpen(Session session) { + serverSession = session; + // Send messages to the client until they appear to hang + // Need to do this on a non-container thread + Runnable r = () -> { + Future<Void> sendMessageFuture; + while (true) { + sendMessageFuture = session.getAsyncRemote().sendText(MSG); + try { + sendMessageFuture.get(2, TimeUnit.SECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + break; + } + } + serverSendLatch.countDown(); + }; + if (sendOnContainerThread) { + r.run(); + } else { + new Thread(r).start(); + } + } + + @OnError + public void onError(@SuppressWarnings("unused") Throwable t) { + // Expected. Swallow the error. + } + } + + @ClientEndpoint + public static class Bug66508Client { + + private CountDownLatch clientReceiveLatch = initialClientReceiveLatch; + + @OnMessage + public void onMessage(@SuppressWarnings("unused") String msg) { + try { + // Block client from processing messages + clientReceiveLatch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org