This is an automated email from the ASF dual-hosted git repository. markt pushed a commit to branch 8.5.x in repository https://gitbox.apache.org/repos/asf/tomcat.git
The following commit(s) were added to refs/heads/8.5.x by this push: new b648088 Additional fix for BZ 64848. Ensure Processor instances are cleaned up b648088 is described below commit b648088ef9237bac54d53e6a70a2da6721531674 Author: Mark Thomas <ma...@apache.org> AuthorDate: Mon Nov 2 14:52:24 2020 +0000 Additional fix for BZ 64848. Ensure Processor instances are cleaned up --- java/org/apache/coyote/AbstractProtocol.java | 30 +++++-- java/org/apache/tomcat/util/net/NioEndpoint.java | 1 + .../tomcat/websocket/TestWebSocketFrameClient.java | 4 +- .../websocket/TestWebSocketFrameClientSSL.java | 8 +- .../tomcat/websocket/TesterFirehoseServer.java | 87 +++++++++++++++++--- .../tomcat/websocket/server/TestSlowClient.java | 92 ++++++++++++++++++++++ webapps/docs/changelog.xml | 10 ++- 7 files changed, 205 insertions(+), 27 deletions(-) diff --git a/java/org/apache/coyote/AbstractProtocol.java b/java/org/apache/coyote/AbstractProtocol.java index 247a5b0..a26f1ba 100644 --- a/java/org/apache/coyote/AbstractProtocol.java +++ b/java/org/apache/coyote/AbstractProtocol.java @@ -415,6 +415,15 @@ public abstract class AbstractProtocol<S> implements ProtocolHandler, } + /* + * Primarily for debugging and testing. Could be exposed via JMX if + * considered useful. + */ + public int getWaitingProcessorCount() { + return waitingProcessors.size(); + } + + // ----------------------------------------------- Accessors for sub-classes protected AbstractEndpoint<S> getEndpoint() { @@ -1010,12 +1019,21 @@ public abstract class AbstractProtocol<S> implements ProtocolHandler, private void release(Processor processor) { if (processor != null) { processor.recycle(); - // After recycling, only instances of UpgradeProcessorBase will - // return true for isUpgrade(). - // Instances of UpgradeProcessorBase should not be added to - // recycledProcessors since that pool is only for AJP or HTTP - // processors - if (!processor.isUpgrade()) { + if (processor.isUpgrade()) { + // While UpgradeProcessor instances should not normally be + // present in waitingProcessors there are various scenarios + // where this can happen. E.g.: + // - when AsyncIO is used + // - WebSocket I/O error on non-container thread + // Err on the side of caution and always try and remove any + // UpgradeProcessor instances from waitingProcessors + getProtocol().removeWaitingProcessor(processor); + } else { + // After recycling, only instances of UpgradeProcessorBase + // will return true for isUpgrade(). + // Instances of UpgradeProcessorBase should not be added to + // recycledProcessors since that pool is only for AJP or + // HTTP processors recycledProcessors.push(processor); if (getLog().isDebugEnabled()) { getLog().debug("Pushed Processor [" + processor + "]"); diff --git a/java/org/apache/tomcat/util/net/NioEndpoint.java b/java/org/apache/tomcat/util/net/NioEndpoint.java index 08b8a4e..281c914 100644 --- a/java/org/apache/tomcat/util/net/NioEndpoint.java +++ b/java/org/apache/tomcat/util/net/NioEndpoint.java @@ -1258,6 +1258,7 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel> { @Override public void close() throws IOException { getSocket().close(); + getEndpoint().getHandler().release(this); } diff --git a/test/org/apache/tomcat/websocket/TestWebSocketFrameClient.java b/test/org/apache/tomcat/websocket/TestWebSocketFrameClient.java index c22f2ee..49f7597 100644 --- a/test/org/apache/tomcat/websocket/TestWebSocketFrameClient.java +++ b/test/org/apache/tomcat/websocket/TestWebSocketFrameClient.java @@ -57,7 +57,7 @@ public class TestWebSocketFrameClient extends WebSocketBaseTest { Tomcat tomcat = getTomcatInstance(); // No file system docBase required Context ctx = tomcat.addContext("", null); - ctx.addApplicationListener(TesterFirehoseServer.Config.class.getName()); + ctx.addApplicationListener(TesterFirehoseServer.ConfigInline.class.getName()); Tomcat.addServlet(ctx, "default", new DefaultServlet()); ctx.addServletMappingDecoded("/", "default"); @@ -83,7 +83,7 @@ public class TestWebSocketFrameClient extends WebSocketBaseTest { TesterProgrammaticEndpoint.class, clientEndpointConfig, new URI("ws://localhost:" + getPort() + - TesterFirehoseServer.Config.PATH)); + TesterFirehoseServer.PATH)); CountDownLatch latch = new CountDownLatch(TesterFirehoseServer.MESSAGE_COUNT); BasicText handler = new BasicText(latch); diff --git a/test/org/apache/tomcat/websocket/TestWebSocketFrameClientSSL.java b/test/org/apache/tomcat/websocket/TestWebSocketFrameClientSSL.java index 2e8b3b3..ad9366a 100644 --- a/test/org/apache/tomcat/websocket/TestWebSocketFrameClientSSL.java +++ b/test/org/apache/tomcat/websocket/TestWebSocketFrameClientSSL.java @@ -51,7 +51,7 @@ public class TestWebSocketFrameClientSSL extends WebSocketBaseTest { Tomcat tomcat = getTomcatInstance(); // No file system docBase required Context ctx = tomcat.addContext("", null); - ctx.addApplicationListener(TesterFirehoseServer.Config.class.getName()); + ctx.addApplicationListener(TesterFirehoseServer.ConfigInline.class.getName()); Tomcat.addServlet(ctx, "default", new DefaultServlet()); ctx.addServletMappingDecoded("/", "default"); @@ -71,7 +71,7 @@ public class TestWebSocketFrameClientSSL extends WebSocketBaseTest { TesterProgrammaticEndpoint.class, clientEndpointConfig, new URI("wss://localhost:" + getPort() + - TesterFirehoseServer.Config.PATH)); + TesterFirehoseServer.PATH)); CountDownLatch latch = new CountDownLatch(TesterFirehoseServer.MESSAGE_COUNT); BasicText handler = new BasicText(latch); @@ -99,7 +99,7 @@ public class TestWebSocketFrameClientSSL extends WebSocketBaseTest { Tomcat tomcat = getTomcatInstance(); // No file system docBase required Context ctx = tomcat.addContext("", null); - ctx.addApplicationListener(TesterFirehoseServer.Config.class.getName()); + ctx.addApplicationListener(TesterFirehoseServer.ConfigInline.class.getName()); Tomcat.addServlet(ctx, "default", new DefaultServlet()); ctx.addServletMappingDecoded("/", "default"); @@ -119,7 +119,7 @@ public class TestWebSocketFrameClientSSL extends WebSocketBaseTest { TesterProgrammaticEndpoint.class, clientEndpointConfig, new URI("wss://localhost:" + getPort() + - TesterFirehoseServer.Config.PATH)); + TesterFirehoseServer.PATH)); // Process incoming messages very slowly MessageHandler handler = new SleepingText(5000); diff --git a/test/org/apache/tomcat/websocket/TesterFirehoseServer.java b/test/org/apache/tomcat/websocket/TesterFirehoseServer.java index 8b4b1ef..eb8ed63 100644 --- a/test/org/apache/tomcat/websocket/TesterFirehoseServer.java +++ b/test/org/apache/tomcat/websocket/TesterFirehoseServer.java @@ -41,6 +41,8 @@ public class TesterFirehoseServer { public static final int WAIT_TIME_MILLIS = 300000; public static final int SEND_TIME_OUT_MILLIS = 5000; + public static final String PATH = "/firehose"; + static { StringBuilder sb = new StringBuilder(MESSAGE_SIZE); for (int i = 0; i < MESSAGE_SIZE; i++) { @@ -50,22 +52,30 @@ public class TesterFirehoseServer { } - public static class Config extends TesterEndpointConfig { + public static class ConfigInline extends TesterEndpointConfig { + + @Override + protected Class<?> getEndpointClass() { + return EndpointInline.class; + } + } + - public static final String PATH = "/firehose"; + public static class ConfigThread extends TesterEndpointConfig { @Override protected Class<?> getEndpointClass() { - return Endpoint.class; + return EndpointThread.class; } } - @ServerEndpoint(Config.PATH) - public static class Endpoint { + public abstract static class Endpoint { + + private static final AtomicInteger openConnectionCount = new AtomicInteger(0); + private static final AtomicInteger errorCount = new AtomicInteger(0); - private static AtomicInteger openConnectionCount = new AtomicInteger(0); - private static AtomicInteger errorCount = new AtomicInteger(0); + private final boolean inline; private volatile boolean started = false; @@ -77,6 +87,10 @@ public class TesterFirehoseServer { return errorCount.intValue(); } + public Endpoint(boolean inline) { + this.inline = inline; + } + @OnOpen public void onOpen() { openConnectionCount.incrementAndGet(); @@ -98,6 +112,46 @@ public class TesterFirehoseServer { System.out.println("Received " + msg + ", now sending data"); + Writer writer = new Writer(session); + + if (inline) { + writer.doRun(); + } else { + Thread t = new Thread(writer); + t.start(); + } + } + + @OnError + public void onError(@SuppressWarnings("unused") Throwable t) { + errorCount.incrementAndGet(); + } + + @OnClose + public void onClose() { + openConnectionCount.decrementAndGet(); + } + } + + + private static class Writer implements Runnable { + + private final Session session; + + public Writer(Session session) { + this.session = session; + } + + @Override + public void run() { + try { + doRun(); + } catch (IOException ioe) { + ioe.printStackTrace(); + } + } + + public void doRun() throws IOException { session.getUserProperties().put( org.apache.tomcat.websocket.Constants.BLOCKING_SEND_TIMEOUT_PROPERTY, Long.valueOf(SEND_TIME_OUT_MILLIS)); @@ -116,15 +170,22 @@ public class TesterFirehoseServer { // Flushing should happen automatically on session close session.close(); } + } - @OnError - public void onError(@SuppressWarnings("unused") Throwable t) { - errorCount.incrementAndGet(); + @ServerEndpoint(PATH) + public static class EndpointInline extends Endpoint { + + public EndpointInline() { + super(true); } + } - @OnClose - public void onClose() { - openConnectionCount.decrementAndGet(); + + @ServerEndpoint(PATH) + public static class EndpointThread extends Endpoint { + + public EndpointThread() { + super(false); } } } diff --git a/test/org/apache/tomcat/websocket/server/TestSlowClient.java b/test/org/apache/tomcat/websocket/server/TestSlowClient.java new file mode 100644 index 0000000..0978dbc --- /dev/null +++ b/test/org/apache/tomcat/websocket/server/TestSlowClient.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tomcat.websocket.server; + +import java.net.URI; + +import javax.websocket.ClientEndpointConfig; +import javax.websocket.ContainerProvider; +import javax.websocket.MessageHandler; +import javax.websocket.Session; +import javax.websocket.WebSocketContainer; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.catalina.Context; +import org.apache.catalina.servlets.DefaultServlet; +import org.apache.catalina.startup.Tomcat; +import org.apache.coyote.AbstractProtocol; +import org.apache.tomcat.websocket.TesterFirehoseServer; +import org.apache.tomcat.websocket.TesterMessageCountClient.TesterProgrammaticEndpoint; +import org.apache.tomcat.websocket.WebSocketBaseTest; + +public class TestSlowClient extends WebSocketBaseTest { + + @Test + public void testSendingFromAppThread() throws Exception { + Tomcat tomcat = getTomcatInstance(); + Context ctx = tomcat.addContext("", null); + // Server side endpoint that sends a stream of messages on a new thread + // in response to any message received. + ctx.addApplicationListener(TesterFirehoseServer.ConfigThread.class.getName()); + Tomcat.addServlet(ctx, "default", new DefaultServlet()); + ctx.addServletMappingDecoded("/", "default"); + + tomcat.start(); + + // WebSocket client + WebSocketContainer wsContainer = ContainerProvider.getWebSocketContainer(); + Session wsSession = wsContainer.connectToServer(TesterProgrammaticEndpoint.class, + ClientEndpointConfig.Builder.create().build(), new URI("ws://localhost:" + getPort() + TesterFirehoseServer.PATH)); + // Configure a handler designed to create a backlog causing the server + // side write to time out. + wsSession.addMessageHandler(new VerySlowHandler()); + + // Trigger the sending of the messages from the server + wsSession.getBasicRemote().sendText("start"); + + // Wait for server to close connection (it shouldn't) + // 20s should be long enough even for the slowest CI system. May need to + // extend this if not. + int count = 0; + while (wsSession.isOpen() && count < 200) { + Thread.sleep(100); + count++; + } + Assert.assertTrue(wsSession.isOpen()); + wsSession.close(); + + // BZ 64848 (non-container thread variant) + // Confirm there are no waiting processors + AbstractProtocol<?> protocol = (AbstractProtocol<?>) tomcat.getConnector().getProtocolHandler(); + Assert.assertEquals(0, protocol.getWaitingProcessorCount()); + } + + + public static class VerySlowHandler implements MessageHandler.Whole<String> { + + @Override + public void onMessage(String message) { + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + // Ignore + } + } + } +} diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml index f26a444..104444c 100644 --- a/webapps/docs/changelog.xml +++ b/webapps/docs/changelog.xml @@ -74,7 +74,7 @@ </fix> </changelog> </subsection> - <subseciton name="Coyote"> + <subsection name="Coyote"> <changelog> <fix> Refactor the HTTP/2 window update handling for padding in data frames to @@ -96,7 +96,13 @@ that did not have one. (markt) </add> </changelog> - </subseciton> + </subsection> + <subsection name="WebSocket"> + <changelog> + <bug>64848</bug>: Fix a variation of this memory leak when a write I/O + error occurs on a non-container thread. (markt) + </changelog> + </subsection> <subsection name="Web applications"> <changelog> <fix> --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org