This is an automated email from the ASF dual-hosted git repository. markt pushed a commit to branch 9.0.x in repository https://gitbox.apache.org/repos/asf/tomcat.git
The following commit(s) were added to refs/heads/9.0.x by this push: new 0ba079c Additional fix for BZ 64848. Ensure Processor instances are cleaned up 0ba079c is described below commit 0ba079cbc7df9b928b022cc9f6837505d5ff3c0d Author: Mark Thomas <ma...@apache.org> AuthorDate: Mon Nov 2 14:52:24 2020 +0000 Additional fix for BZ 64848. Ensure Processor instances are cleaned up --- java/org/apache/coyote/AbstractProtocol.java | 27 ++++--- .../tomcat/websocket/TestWebSocketFrameClient.java | 4 +- .../websocket/TestWebSocketFrameClientSSL.java | 8 +- .../tomcat/websocket/TesterFirehoseServer.java | 87 +++++++++++++++++--- .../tomcat/websocket/server/TestSlowClient.java | 92 ++++++++++++++++++++++ webapps/docs/changelog.xml | 6 ++ 6 files changed, 195 insertions(+), 29 deletions(-) diff --git a/java/org/apache/coyote/AbstractProtocol.java b/java/org/apache/coyote/AbstractProtocol.java index 28eb9e9..c226b72 100644 --- a/java/org/apache/coyote/AbstractProtocol.java +++ b/java/org/apache/coyote/AbstractProtocol.java @@ -40,7 +40,6 @@ import javax.servlet.http.HttpUpgradeHandler; import javax.servlet.http.WebConnection; import org.apache.coyote.http11.upgrade.InternalHttpUpgradeHandler; -import org.apache.coyote.http11.upgrade.UpgradeProcessorInternal; import org.apache.juli.logging.Log; import org.apache.tomcat.InstanceManager; import org.apache.tomcat.util.ExceptionUtils; @@ -408,6 +407,15 @@ public abstract class AbstractProtocol<S> implements ProtocolHandler, } + /* + * Primarily for debugging and testing. Could be exposed via JMX if + * considered useful. + */ + public int getWaitingProcessorCount() { + return waitingProcessors.size(); + } + + // ----------------------------------------------- Accessors for sub-classes protected AbstractEndpoint<S,?> getEndpoint() { @@ -1075,15 +1083,14 @@ public abstract class AbstractProtocol<S> implements ProtocolHandler, if (processor != null) { processor.recycle(); if (processor.isUpgrade()) { - // UpgradeProcessorInternal instances can utilise AsyncIO. - // If they do, the processor will not pass through the - // process() method and be removed from waitingProcessors - // so do that here. - if (processor instanceof UpgradeProcessorInternal) { - if (((UpgradeProcessorInternal) processor).hasAsyncIO()) { - getProtocol().removeWaitingProcessor(processor); - } - } + // While UpgradeProcessor instances should not normally be + // present in waitingProcessors there are various scenarios + // where this can happen. E.g.: + // - when AsyncIO is used + // - WebSocket I/O error on non-container thread + // Err on the side of caution and always try and remove any + // UpgradeProcessor instances from waitingProcessors + getProtocol().removeWaitingProcessor(processor); } else { // After recycling, only instances of UpgradeProcessorBase // will return true for isUpgrade(). diff --git a/test/org/apache/tomcat/websocket/TestWebSocketFrameClient.java b/test/org/apache/tomcat/websocket/TestWebSocketFrameClient.java index 73fe483..9f5c730 100644 --- a/test/org/apache/tomcat/websocket/TestWebSocketFrameClient.java +++ b/test/org/apache/tomcat/websocket/TestWebSocketFrameClient.java @@ -57,7 +57,7 @@ public class TestWebSocketFrameClient extends WebSocketBaseTest { Tomcat tomcat = getTomcatInstance(); // No file system docBase required Context ctx = tomcat.addContext("", null); - ctx.addApplicationListener(TesterFirehoseServer.Config.class.getName()); + ctx.addApplicationListener(TesterFirehoseServer.ConfigInline.class.getName()); Tomcat.addServlet(ctx, "default", new DefaultServlet()); ctx.addServletMappingDecoded("/", "default"); @@ -80,7 +80,7 @@ public class TestWebSocketFrameClient extends WebSocketBaseTest { TesterProgrammaticEndpoint.class, clientEndpointConfig, new URI("ws://localhost:" + getPort() + - TesterFirehoseServer.Config.PATH)); + TesterFirehoseServer.PATH)); CountDownLatch latch = new CountDownLatch(TesterFirehoseServer.MESSAGE_COUNT); BasicText handler = new BasicText(latch); diff --git a/test/org/apache/tomcat/websocket/TestWebSocketFrameClientSSL.java b/test/org/apache/tomcat/websocket/TestWebSocketFrameClientSSL.java index ab12e0c..190a72e 100644 --- a/test/org/apache/tomcat/websocket/TestWebSocketFrameClientSSL.java +++ b/test/org/apache/tomcat/websocket/TestWebSocketFrameClientSSL.java @@ -45,7 +45,7 @@ public class TestWebSocketFrameClientSSL extends WebSocketBaseTest { Tomcat tomcat = getTomcatInstance(); // No file system docBase required Context ctx = tomcat.addContext("", null); - ctx.addApplicationListener(TesterFirehoseServer.Config.class.getName()); + ctx.addApplicationListener(TesterFirehoseServer.ConfigInline.class.getName()); Tomcat.addServlet(ctx, "default", new DefaultServlet()); ctx.addServletMappingDecoded("/", "default"); @@ -64,7 +64,7 @@ public class TestWebSocketFrameClientSSL extends WebSocketBaseTest { TesterProgrammaticEndpoint.class, clientEndpointConfig, new URI("wss://localhost:" + getPort() + - TesterFirehoseServer.Config.PATH)); + TesterFirehoseServer.PATH)); CountDownLatch latch = new CountDownLatch(TesterFirehoseServer.MESSAGE_COUNT); BasicText handler = new BasicText(latch); @@ -92,7 +92,7 @@ public class TestWebSocketFrameClientSSL extends WebSocketBaseTest { Tomcat tomcat = getTomcatInstance(); // No file system docBase required Context ctx = tomcat.addContext("", null); - ctx.addApplicationListener(TesterFirehoseServer.Config.class.getName()); + ctx.addApplicationListener(TesterFirehoseServer.ConfigInline.class.getName()); Tomcat.addServlet(ctx, "default", new DefaultServlet()); ctx.addServletMappingDecoded("/", "default"); @@ -111,7 +111,7 @@ public class TestWebSocketFrameClientSSL extends WebSocketBaseTest { TesterProgrammaticEndpoint.class, clientEndpointConfig, new URI("wss://localhost:" + getPort() + - TesterFirehoseServer.Config.PATH)); + TesterFirehoseServer.PATH)); // Process incoming messages very slowly MessageHandler handler = new SleepingText(5000); diff --git a/test/org/apache/tomcat/websocket/TesterFirehoseServer.java b/test/org/apache/tomcat/websocket/TesterFirehoseServer.java index 8b4b1ef..eb8ed63 100644 --- a/test/org/apache/tomcat/websocket/TesterFirehoseServer.java +++ b/test/org/apache/tomcat/websocket/TesterFirehoseServer.java @@ -41,6 +41,8 @@ public class TesterFirehoseServer { public static final int WAIT_TIME_MILLIS = 300000; public static final int SEND_TIME_OUT_MILLIS = 5000; + public static final String PATH = "/firehose"; + static { StringBuilder sb = new StringBuilder(MESSAGE_SIZE); for (int i = 0; i < MESSAGE_SIZE; i++) { @@ -50,22 +52,30 @@ public class TesterFirehoseServer { } - public static class Config extends TesterEndpointConfig { + public static class ConfigInline extends TesterEndpointConfig { + + @Override + protected Class<?> getEndpointClass() { + return EndpointInline.class; + } + } + - public static final String PATH = "/firehose"; + public static class ConfigThread extends TesterEndpointConfig { @Override protected Class<?> getEndpointClass() { - return Endpoint.class; + return EndpointThread.class; } } - @ServerEndpoint(Config.PATH) - public static class Endpoint { + public abstract static class Endpoint { + + private static final AtomicInteger openConnectionCount = new AtomicInteger(0); + private static final AtomicInteger errorCount = new AtomicInteger(0); - private static AtomicInteger openConnectionCount = new AtomicInteger(0); - private static AtomicInteger errorCount = new AtomicInteger(0); + private final boolean inline; private volatile boolean started = false; @@ -77,6 +87,10 @@ public class TesterFirehoseServer { return errorCount.intValue(); } + public Endpoint(boolean inline) { + this.inline = inline; + } + @OnOpen public void onOpen() { openConnectionCount.incrementAndGet(); @@ -98,6 +112,46 @@ public class TesterFirehoseServer { System.out.println("Received " + msg + ", now sending data"); + Writer writer = new Writer(session); + + if (inline) { + writer.doRun(); + } else { + Thread t = new Thread(writer); + t.start(); + } + } + + @OnError + public void onError(@SuppressWarnings("unused") Throwable t) { + errorCount.incrementAndGet(); + } + + @OnClose + public void onClose() { + openConnectionCount.decrementAndGet(); + } + } + + + private static class Writer implements Runnable { + + private final Session session; + + public Writer(Session session) { + this.session = session; + } + + @Override + public void run() { + try { + doRun(); + } catch (IOException ioe) { + ioe.printStackTrace(); + } + } + + public void doRun() throws IOException { session.getUserProperties().put( org.apache.tomcat.websocket.Constants.BLOCKING_SEND_TIMEOUT_PROPERTY, Long.valueOf(SEND_TIME_OUT_MILLIS)); @@ -116,15 +170,22 @@ public class TesterFirehoseServer { // Flushing should happen automatically on session close session.close(); } + } - @OnError - public void onError(@SuppressWarnings("unused") Throwable t) { - errorCount.incrementAndGet(); + @ServerEndpoint(PATH) + public static class EndpointInline extends Endpoint { + + public EndpointInline() { + super(true); } + } - @OnClose - public void onClose() { - openConnectionCount.decrementAndGet(); + + @ServerEndpoint(PATH) + public static class EndpointThread extends Endpoint { + + public EndpointThread() { + super(false); } } } diff --git a/test/org/apache/tomcat/websocket/server/TestSlowClient.java b/test/org/apache/tomcat/websocket/server/TestSlowClient.java new file mode 100644 index 0000000..8c73608 --- /dev/null +++ b/test/org/apache/tomcat/websocket/server/TestSlowClient.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tomcat.websocket.server; + +import java.net.URI; + +import jakarta.websocket.ClientEndpointConfig; +import jakarta.websocket.ContainerProvider; +import jakarta.websocket.MessageHandler; +import jakarta.websocket.Session; +import jakarta.websocket.WebSocketContainer; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.catalina.Context; +import org.apache.catalina.servlets.DefaultServlet; +import org.apache.catalina.startup.Tomcat; +import org.apache.coyote.AbstractProtocol; +import org.apache.tomcat.websocket.TesterFirehoseServer; +import org.apache.tomcat.websocket.TesterMessageCountClient.TesterProgrammaticEndpoint; +import org.apache.tomcat.websocket.WebSocketBaseTest; + +public class TestSlowClient extends WebSocketBaseTest { + + @Test + public void testSendingFromAppThread() throws Exception { + Tomcat tomcat = getTomcatInstance(); + Context ctx = tomcat.addContext("", null); + // Server side endpoint that sends a stream of messages on a new thread + // in response to any message received. + ctx.addApplicationListener(TesterFirehoseServer.ConfigThread.class.getName()); + Tomcat.addServlet(ctx, "default", new DefaultServlet()); + ctx.addServletMappingDecoded("/", "default"); + + tomcat.start(); + + // WebSocket client + WebSocketContainer wsContainer = ContainerProvider.getWebSocketContainer(); + Session wsSession = wsContainer.connectToServer(TesterProgrammaticEndpoint.class, + ClientEndpointConfig.Builder.create().build(), new URI("ws://localhost:" + getPort() + TesterFirehoseServer.PATH)); + // Configure a handler designed to create a backlog causing the server + // side write to time out. + wsSession.addMessageHandler(new VerySlowHandler()); + + // Trigger the sending of the messages from the server + wsSession.getBasicRemote().sendText("start"); + + // Wait for server to close connection (it shouldn't) + // 20s should be long enough even for the slowest CI system. May need to + // extend this if not. + int count = 0; + while (wsSession.isOpen() && count < 200) { + Thread.sleep(100); + count++; + } + Assert.assertTrue(wsSession.isOpen()); + wsSession.close(); + + // BZ 64848 (non-container thread variant) + // Confirm there are no waiting processors + AbstractProtocol<?> protocol = (AbstractProtocol<?>) tomcat.getConnector().getProtocolHandler(); + Assert.assertEquals(0, protocol.getWaitingProcessorCount()); + } + + + public static class VerySlowHandler implements MessageHandler.Whole<String> { + + @Override + public void onMessage(String message) { + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + // Ignore + } + } + } +} diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml index 19a4ef5..7b963a8 100644 --- a/webapps/docs/changelog.xml +++ b/webapps/docs/changelog.xml @@ -145,6 +145,12 @@ </update> </changelog> </subsection> + <subsection name="WebSocket"> + <changelog> + <bug>64848</bug>: Fix a variation of this memory leak when a write I/O + error occurs on a non-container thread. (markt) + </changelog> + </subsection> <subsection name="Web applications"> <changelog> <fix> --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org