This is an automated email from the ASF dual-hosted git repository. markt pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/tomcat.git
The following commit(s) were added to refs/heads/master by this push: new 89d4960 Fix async timeouts with HTTP/2 89d4960 is described below commit 89d4960faf6106f8e4895b1b0e02c07176513dfc Author: Mark Thomas <ma...@apache.org> AuthorDate: Tue Dec 10 09:31:29 2019 +0000 Fix async timeouts with HTTP/2 --- java/org/apache/coyote/AbstractProtocol.java | 26 ++-- java/org/apache/coyote/LocalStrings.properties | 3 + .../coyote/http11/AbstractHttp11Protocol.java | 5 + java/org/apache/coyote/http2/Http2Protocol.java | 11 ++ java/org/apache/coyote/http2/StreamProcessor.java | 5 +- test/org/apache/coyote/http2/TestAsyncTimeout.java | 152 ++++++++++++++++----- webapps/docs/changelog.xml | 4 + 7 files changed, 162 insertions(+), 44 deletions(-) diff --git a/java/org/apache/coyote/AbstractProtocol.java b/java/org/apache/coyote/AbstractProtocol.java index b32b546..b41ad32 100644 --- a/java/org/apache/coyote/AbstractProtocol.java +++ b/java/org/apache/coyote/AbstractProtocol.java @@ -391,11 +391,17 @@ public abstract class AbstractProtocol<S> implements ProtocolHandler, public void addWaitingProcessor(Processor processor) { + if (getLog().isDebugEnabled()) { + getLog().debug(sm.getString("abstractProcotol.waitingProcerssor.add", processor)); + } waitingProcessors.add(processor); } public void removeWaitingProcessor(Processor processor) { + if (getLog().isDebugEnabled()) { + getLog().debug(sm.getString("abstractProcotol.waitingProcerssor.remove", processor)); + } waitingProcessors.remove(processor); } @@ -804,11 +810,12 @@ public abstract class AbstractProtocol<S> implements ProtocolHandler, // OpenSSL typically returns null whereas JSSE typically // returns "" when no protocol is negotiated if (negotiatedProtocol != null && negotiatedProtocol.length() > 0) { - UpgradeProtocol upgradeProtocol = - getProtocol().getNegotiatedProtocol(negotiatedProtocol); + UpgradeProtocol upgradeProtocol = getProtocol().getNegotiatedProtocol(negotiatedProtocol); if (upgradeProtocol != null) { - processor = upgradeProtocol.getProcessor( - wrapper, getProtocol().getAdapter()); + processor = upgradeProtocol.getProcessor(wrapper, getProtocol().getAdapter()); + if (getLog().isDebugEnabled()) { + getLog().debug(sm.getString("abstractConnectionHandler.processorCreate", processor)); + } } else if (negotiatedProtocol.equals("http/1.1")) { // Explicitly negotiated the default protocol. // Obtain a processor below. @@ -821,9 +828,8 @@ public abstract class AbstractProtocol<S> implements ProtocolHandler, // replace the code below with the commented out // block. if (getLog().isDebugEnabled()) { - getLog().debug(sm.getString( - "abstractConnectionHandler.negotiatedProcessor.fail", - negotiatedProtocol)); + getLog().debug(sm.getString("abstractConnectionHandler.negotiatedProcessor.fail", + negotiatedProtocol)); } return SocketState.CLOSED; /* @@ -840,13 +846,15 @@ public abstract class AbstractProtocol<S> implements ProtocolHandler, if (processor == null) { processor = recycledProcessors.pop(); if (getLog().isDebugEnabled()) { - getLog().debug(sm.getString("abstractConnectionHandler.processorPop", - processor)); + getLog().debug(sm.getString("abstractConnectionHandler.processorPop", processor)); } } if (processor == null) { processor = getProtocol().createProcessor(); register(processor); + if (getLog().isDebugEnabled()) { + getLog().debug(sm.getString("abstractConnectionHandler.processorCreate", processor)); + } } processor.setSslSupport( diff --git a/java/org/apache/coyote/LocalStrings.properties b/java/org/apache/coyote/LocalStrings.properties index 4dddf71..b20d056 100644 --- a/java/org/apache/coyote/LocalStrings.properties +++ b/java/org/apache/coyote/LocalStrings.properties @@ -19,6 +19,7 @@ abstractConnectionHandler.ioexception.debug=IOExceptions are normal, ignored abstractConnectionHandler.negotiatedProcessor.fail=Failed to create Processor for negotiated protocol [{0}] abstractConnectionHandler.oome=Failed to complete processing of a request abstractConnectionHandler.process=Processing socket [{0}] with status [{1}] +abstractConnectionHandler.processorCreate=Created new processor [{0}] abstractConnectionHandler.processorPop=Popped processor [{0}] from cache abstractConnectionHandler.protocolexception.debug=ProtocolExceptions are normal, ignored abstractConnectionHandler.socketexception.debug=SocketExceptions are normal, ignored @@ -35,6 +36,8 @@ abstractProcessor.socket.ssl=Exception getting SSL attributes abstractProtocol.mbeanDeregistrationFailed=Failed to deregister MBean named [{0}] from MBean server [{1}] abstractProtocol.processorRegisterError=Error registering request processor abstractProtocol.processorUnregisterError=Error unregistering request processor +abstractProcotol.waitingProcerssor.add=Added processor [{0}] to waiting processors +abstractProcotol.waitingProcerssor.remove=Removed processor [{0}] from waiting processors abstractProtocolHandler.asyncTimeoutError=Error processing async timeouts abstractProtocolHandler.destroy=Destroying ProtocolHandler [{0}] diff --git a/java/org/apache/coyote/http11/AbstractHttp11Protocol.java b/java/org/apache/coyote/http11/AbstractHttp11Protocol.java index ab2a482..e8a5b91 100644 --- a/java/org/apache/coyote/http11/AbstractHttp11Protocol.java +++ b/java/org/apache/coyote/http11/AbstractHttp11Protocol.java @@ -39,6 +39,7 @@ import org.apache.coyote.UpgradeToken; import org.apache.coyote.http11.upgrade.InternalHttpUpgradeHandler; import org.apache.coyote.http11.upgrade.UpgradeProcessorExternal; import org.apache.coyote.http11.upgrade.UpgradeProcessorInternal; +import org.apache.coyote.http2.Http2Protocol; import org.apache.tomcat.util.buf.StringUtils; import org.apache.tomcat.util.net.AbstractEndpoint; import org.apache.tomcat.util.net.SSLHostConfig; @@ -498,6 +499,10 @@ public abstract class AbstractHttp11Protocol<S> extends AbstractProtocol<S> { } } } + + if (upgradeProtocol instanceof Http2Protocol) { + ((Http2Protocol) upgradeProtocol).setHttp11Protocol(this); + } } @Override public UpgradeProtocol getNegotiatedProtocol(String negotiatedName) { diff --git a/java/org/apache/coyote/http2/Http2Protocol.java b/java/org/apache/coyote/http2/Http2Protocol.java index f11f2d1..f8ee7ec 100644 --- a/java/org/apache/coyote/http2/Http2Protocol.java +++ b/java/org/apache/coyote/http2/Http2Protocol.java @@ -34,6 +34,7 @@ import org.apache.coyote.Request; import org.apache.coyote.Response; import org.apache.coyote.UpgradeProtocol; import org.apache.coyote.UpgradeToken; +import org.apache.coyote.http11.AbstractHttp11Protocol; import org.apache.coyote.http11.upgrade.InternalHttpUpgradeHandler; import org.apache.coyote.http11.upgrade.UpgradeProcessorInternal; import org.apache.tomcat.util.buf.StringUtils; @@ -91,6 +92,8 @@ public class Http2Protocol implements UpgradeProtocol { private boolean useSendfile = true; // Compression private final CompressionConfig compressionConfig = new CompressionConfig(); + // Reference to HTTP/1.1 protocol that this instance is configured under + private AbstractHttp11Protocol<?> http11Protocol = null; @Override public String getHttpUpgradeName(boolean isSSLEnabled) { @@ -418,4 +421,12 @@ public class Http2Protocol implements UpgradeProtocol { public boolean useCompression(Request request, Response response) { return compressionConfig.useCompression(request, response); } + + + public AbstractHttp11Protocol<?> getHttp11Protocol() { + return this.http11Protocol; + } + public void setHttp11Protocol(AbstractHttp11Protocol<?> http11Protocol) { + this.http11Protocol = http11Protocol; + } } diff --git a/java/org/apache/coyote/http2/StreamProcessor.java b/java/org/apache/coyote/http2/StreamProcessor.java index c550f19..99e2d78 100644 --- a/java/org/apache/coyote/http2/StreamProcessor.java +++ b/java/org/apache/coyote/http2/StreamProcessor.java @@ -71,7 +71,10 @@ class StreamProcessor extends AbstractProcessor { try { state = process(socketWrapper, event); - if (state == SocketState.CLOSED) { + if (state == SocketState.LONG) { + handler.getProtocol().getHttp11Protocol().addWaitingProcessor(this); + } else if (state == SocketState.CLOSED) { + handler.getProtocol().getHttp11Protocol().removeWaitingProcessor(this); if (!getErrorState().isConnectionIoAllowed()) { ConnectionException ce = new ConnectionException(sm.getString( "streamProcessor.error.connection", stream.getConnectionId(), diff --git a/test/org/apache/coyote/http2/TestAsyncTimeout.java b/test/org/apache/coyote/http2/TestAsyncTimeout.java index 70a5348..1e97490 100644 --- a/test/org/apache/coyote/http2/TestAsyncTimeout.java +++ b/test/org/apache/coyote/http2/TestAsyncTimeout.java @@ -19,6 +19,8 @@ package org.apache.coyote.http2; import java.io.IOException; import java.io.PrintWriter; import java.nio.ByteBuffer; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import javax.servlet.AsyncContext; @@ -29,7 +31,6 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.junit.Assert; -import org.junit.Ignore; import org.junit.Test; import org.apache.catalina.Context; @@ -38,7 +39,6 @@ import org.apache.catalina.startup.Tomcat; public class TestAsyncTimeout extends Http2TestBase { - @Ignore // Until this HTTP/2 + async timeouts is fixed @Test public void testTimeout() throws Exception { enableHttp2(); @@ -46,9 +46,16 @@ public class TestAsyncTimeout extends Http2TestBase { Tomcat tomcat = getTomcatInstance(); Context ctxt = tomcat.addContext("", null); + // This is the target of the HTTP/2 upgrade request Tomcat.addServlet(ctxt, "simple", new SimpleServlet()); ctxt.addServletMappingDecoded("/simple", "simple"); - Wrapper w = Tomcat.addServlet(ctxt, "async", new AsyncTimeoutServlet()); + + // This is the servlet that does that actual test + // This latch is used to signal that that async thread used by the test + // has ended. It isn;t essential to the test but it allows the test to + // complete without Tmcat logging an error about a still running thread. + CountDownLatch latch = new CountDownLatch(1); + Wrapper w = Tomcat.addServlet(ctxt, "async", new AsyncTimeoutServlet(latch)); w.setAsyncSupported(true); ctxt.addServletMappingDecoded("/async", "async"); tomcat.start(); @@ -58,9 +65,11 @@ public class TestAsyncTimeout extends Http2TestBase { sendClientPreface(); validateHttp2InitialResponse(); - // Reset connection window size after intial response + // Reset connection window size after initial response sendWindowUpdate(0, SimpleServlet.CONTENT_LENGTH); + // Include the response body in the trace so we can check for the PASS / + // FAIL text. output.setTraceBody(true); // Send request @@ -74,9 +83,11 @@ public class TestAsyncTimeout extends Http2TestBase { // Body parser.readFrame(true); - // Check that the right number of bytes were received + // Check that the expected text was received String trace = output.getTrace(); Assert.assertTrue(trace, trace.contains("PASS")); + + latch.await(10, TimeUnit.SECONDS); } @@ -84,59 +95,132 @@ public class TestAsyncTimeout extends Http2TestBase { private static final long serialVersionUID = 1L; + private final CountDownLatch latch; + + public AsyncTimeoutServlet(CountDownLatch latch) { + this.latch = latch; + } + @Override protected void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException { + // The idea of this test is that the timeout kicks in after 2 + // seconds and stops the async thread early rather than letting it + // complete the full 5 seconds of processing. final AsyncContext asyncContext = request.startAsync(); response.setStatus(HttpServletResponse.SC_OK); response.setContentType("text/plain"); response.setCharacterEncoding("UTF-8"); - asyncContext.addListener(new AsyncListener() { + // Only want to call complete() once (else we get stack traces in + // the logs so use this to track when complete() is called). + AtomicBoolean completeCalled = new AtomicBoolean(false); + Ticker ticker = new Ticker(asyncContext, completeCalled); + TimeoutListener listener = new TimeoutListener(latch, ticker, completeCalled); + asyncContext.addListener(listener); + asyncContext.setTimeout(2000); + ticker.start(); + } + } - AtomicBoolean ended = new AtomicBoolean(false); - @Override - public void onTimeout(AsyncEvent event) throws IOException { - if (ended.compareAndSet(false, true)) { - PrintWriter pw = event.getAsyncContext().getResponse().getWriter(); - pw.write("PASS"); - pw.flush(); - event.getAsyncContext().complete(); - } - } + private static class Ticker extends Thread { - @Override - public void onStartAsync(AsyncEvent event) throws IOException { - // NO-OP - } + private final AsyncContext asyncContext; + private final AtomicBoolean completeCalled; + private volatile boolean running = true; + + public Ticker(AsyncContext asyncContext, AtomicBoolean completeCalled) { + this.asyncContext = asyncContext; + this.completeCalled = completeCalled; + } + + public void end() { + running = false; + } - @Override - public void onError(AsyncEvent event) throws IOException { - // NO-OP + @Override + public void run() { + try { + PrintWriter pw = asyncContext.getResponse().getWriter(); + int counter = 0; + + // If the test works running will be set too false before + // counter reaches 50. + while (running && counter < 50) { + Thread.sleep(100); + counter++; + pw.print("Tick " + counter); } + // Need to call complete() here if the test fails but complete() + // should have been called by the listener. Use the flag to make + // sure we only call complete once. + if (completeCalled.compareAndSet(false, true)) { + asyncContext.complete(); + } + } catch (IOException | InterruptedException e) { + // Ignore + } + } + } + + + private static class TimeoutListener implements AsyncListener { - @Override - public void onComplete(AsyncEvent event) throws IOException { - if (ended.compareAndSet(false, true)) { - PrintWriter pw = event.getAsyncContext().getResponse().getWriter(); - pw.write("FAIL"); - pw.flush(); - } + private final AtomicBoolean ended = new AtomicBoolean(false); + private final CountDownLatch latch; + private final Ticker ticker; + private final AtomicBoolean completeCalled; + + public TimeoutListener(CountDownLatch latch, Ticker ticker, AtomicBoolean completeCalled) { + this.latch = latch; + this.ticker = ticker; + this.completeCalled = completeCalled; + } + + @Override + public void onTimeout(AsyncEvent event) throws IOException { + ticker.end(); + if (ended.compareAndSet(false, true)) { + PrintWriter pw = event.getAsyncContext().getResponse().getWriter(); + pw.write("PASS"); + pw.flush(); + // If the timeout fires we should always need to call complete() + // here but use the flag to be safe. + if (completeCalled.compareAndSet(false, true)) { + event.getAsyncContext().complete(); } - }); + } + } + @Override + public void onStartAsync(AsyncEvent event) throws IOException { + // NO-OP + } - asyncContext.setTimeout(2000); + @Override + public void onError(AsyncEvent event) throws IOException { + // NO-OP + } + @Override + public void onComplete(AsyncEvent event) throws IOException { + if (ended.compareAndSet(false, true)) { + PrintWriter pw = event.getAsyncContext().getResponse().getWriter(); + pw.write("FAIL"); + pw.flush(); + } try { - Thread.sleep(4000); + // Wait for the async thread to end before we signal that the + // test is complete. This avoids logging an exception about a + // still running thread when the unit test shuts down. + ticker.join(); + latch.countDown(); } catch (InterruptedException e) { // Ignore } - asyncContext.complete(); } } } diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml index 5e48ebf..bf24670 100644 --- a/webapps/docs/changelog.xml +++ b/webapps/docs/changelog.xml @@ -50,6 +50,10 @@ <update> Simplify NIO blocking read and write. (remm) </update> + <fix> + Ensure that Servlet Asynchronous processing timeouts fire when requests + are made using HTTP/2. (markt) + </fix> </changelog> </subsection> </section> --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org