This is an automated email from the ASF dual-hosted git repository. markt pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/tomcat.git
The following commit(s) were added to refs/heads/main by this push: new e1fb169adb Align async close implementation with Servlet 6.1 clarification e1fb169adb is described below commit e1fb169adb953e826fecb4b15a178636aa7c3a13 Author: Mark Thomas <ma...@apache.org> AuthorDate: Tue Dec 13 11:22:51 2022 +0000 Align async close implementation with Servlet 6.1 clarification Includes a test case --- java/org/apache/coyote/AbstractProcessor.java | 8 +- java/org/apache/coyote/AbstractProcessorLight.java | 2 +- java/org/apache/coyote/AsyncStateMachine.java | 9 +- .../apache/coyote/http11/Http11OutputBuffer.java | 9 +- .../catalina/nonblocking/TestNonBlockingAPI.java | 157 +++++++++++++++++++++ webapps/docs/changelog.xml | 11 +- 6 files changed, 182 insertions(+), 14 deletions(-) diff --git a/java/org/apache/coyote/AbstractProcessor.java b/java/org/apache/coyote/AbstractProcessor.java index 50743ffece..6d3be069c7 100644 --- a/java/org/apache/coyote/AbstractProcessor.java +++ b/java/org/apache/coyote/AbstractProcessor.java @@ -193,7 +193,7 @@ public abstract class AbstractProcessor extends AbstractProcessorLight implement @Override - public SocketState asyncPostProcess() { + public SocketState asyncPostProcess() throws IOException { return asyncStateMachine.asyncPostProcess(); } @@ -570,7 +570,11 @@ public abstract class AbstractProcessor extends AbstractProcessorLight implement break; } case ASYNC_POST_PROCESS: { - asyncStateMachine.asyncPostProcess(); + try { + asyncStateMachine.asyncPostProcess(); + } catch (IOException e) { + handleIOException(e); + } break; } diff --git a/java/org/apache/coyote/AbstractProcessorLight.java b/java/org/apache/coyote/AbstractProcessorLight.java index 709530dfc8..9ba345fa66 100644 --- a/java/org/apache/coyote/AbstractProcessorLight.java +++ b/java/org/apache/coyote/AbstractProcessorLight.java @@ -192,7 +192,7 @@ public abstract class AbstractProcessorLight implements Processor { */ protected abstract SocketState dispatch(SocketEvent status) throws IOException; - protected abstract SocketState asyncPostProcess(); + protected abstract SocketState asyncPostProcess() throws IOException; protected abstract Log getLog(); } diff --git a/java/org/apache/coyote/AsyncStateMachine.java b/java/org/apache/coyote/AsyncStateMachine.java index b400788831..50b16316a9 100644 --- a/java/org/apache/coyote/AsyncStateMachine.java +++ b/java/org/apache/coyote/AsyncStateMachine.java @@ -16,6 +16,7 @@ */ package org.apache.coyote; +import java.io.IOException; import java.util.concurrent.atomic.AtomicLong; import org.apache.juli.logging.Log; @@ -264,7 +265,7 @@ class AsyncStateMachine { * current state. For example, as per SRV.2.3.3.3 can now process calls to * complete() or dispatch(). */ - synchronized SocketState asyncPostProcess() { + synchronized SocketState asyncPostProcess() throws IOException { if (state == AsyncState.COMPLETE_PENDING) { clearNonBlockingListeners(); updateState(AsyncState.COMPLETING); @@ -277,6 +278,9 @@ class AsyncStateMachine { updateState(AsyncState.STARTED); return SocketState.LONG; } else if (state == AsyncState.MUST_COMPLETE || state == AsyncState.COMPLETING) { + if (processor.getErrorState().isIoAllowed() && processor.flushBufferedWrite()) { + return SocketState.LONG; + } asyncCtxt.fireOnComplete(); updateState(AsyncState.DISPATCHED); asyncCtxt.decrementInProgressAsyncCount(); @@ -285,6 +289,9 @@ class AsyncStateMachine { updateState(AsyncState.DISPATCHING); return SocketState.ASYNC_END; } else if (state == AsyncState.DISPATCHING) { + if (processor.getErrorState().isIoAllowed() && processor.flushBufferedWrite()) { + return SocketState.LONG; + } updateState(AsyncState.DISPATCHED); asyncCtxt.decrementInProgressAsyncCount(); return SocketState.ASYNC_END; diff --git a/java/org/apache/coyote/http11/Http11OutputBuffer.java b/java/org/apache/coyote/http11/Http11OutputBuffer.java index 570b90c0d7..e7206a50bd 100644 --- a/java/org/apache/coyote/http11/Http11OutputBuffer.java +++ b/java/org/apache/coyote/http11/Http11OutputBuffer.java @@ -565,14 +565,7 @@ public class Http11OutputBuffer implements HttpOutputBuffer { @Override public void end() throws IOException { - /* - * TODO - * As of Servlet 6.1, this flush is (currently) meant to be - * non-blocking if the output stream is in non-blocking mode. That - * requirement creates various complications I want to discuss with - * the EG before I try implementing it. - */ - socketWrapper.flush(true); + socketWrapper.flush(response.getWriteListener() == null); } @Override diff --git a/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java b/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java index 0609eb05b6..018742394c 100644 --- a/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java +++ b/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java @@ -1697,4 +1697,161 @@ public class TestNonBlockingAPI extends TomcatBaseTest { // NO-OP } } + + + @Test + public void testNonBlockingWriteWithClose() throws Exception { + AtomicBoolean asyncContextIsComplete = new AtomicBoolean(false); + AtomicBoolean asyncContextIsError = new AtomicBoolean(false); + + CountDownLatch beforeCloseLatch = new CountDownLatch(1); + CountDownLatch afterCloseLatch = new CountDownLatch(1); + + AtomicInteger written = new AtomicInteger(-1); + + Tomcat tomcat = getTomcatInstance(); + // Note: Low values of socket.txBufSize can trigger very poor + // performance. + Assert.assertTrue(tomcat.getConnector().setProperty("socket.txBufSize", "524228")); + + // No file system docBase required + Context ctx = tomcat.addContext("", null); + + TesterAccessLogValve alv = new TesterAccessLogValve(); + ctx.getPipeline().addValve(alv); + + NBWriteWithCloseServlet servlet = new NBWriteWithCloseServlet( + asyncContextIsComplete, asyncContextIsError, beforeCloseLatch, afterCloseLatch, written); + String servletName = NBWriteWithCloseServlet.class.getName(); + Tomcat.addServlet(ctx, servletName, servlet); + ctx.addServletMappingDecoded("/", servletName); + + tomcat.start(); + + SocketFactory factory = SocketFactory.getDefault(); + Socket s = factory.createSocket("localhost", getPort()); + + OutputStream os = s.getOutputStream(); + os.write(("GET / HTTP/1.1\r\n" + + "Host: localhost:" + getPort() + "\r\n" + + "Connection: close\r\n" + + "\r\n").getBytes(StandardCharsets.ISO_8859_1)); + os.flush(); + + // Wait for Servlet to fill write buffer + beforeCloseLatch.await(); + // Close should return immediately + long start = System.nanoTime(); + afterCloseLatch.await(); + long duration = System.nanoTime() - start; + + Assert.assertTrue("Close took [" + duration + "] ns", duration < 1_000_000_000); + + // Read the body + InputStream is = s.getInputStream(); + int read = 0; + byte[] buffer = new byte[8192]; + do { + read = is.read(buffer); + } while (read != -1); + + os.close(); + is.close(); + s.close(); + + Assert.assertTrue(asyncContextIsComplete.get()); + } + + + @WebServlet(asyncSupported = true) + public static class NBWriteWithCloseServlet extends HttpServlet { + private static final long serialVersionUID = 1L; + private final AtomicBoolean asyncContextIsComplete; + private final AtomicBoolean asyncContextIsError; + private final CountDownLatch beforeCloseLatch; + private final CountDownLatch afterCloseLatch; + private final AtomicInteger written; + + public NBWriteWithCloseServlet(AtomicBoolean asyncContextIsComplete, AtomicBoolean asyncContextIsError, + CountDownLatch beforeCloseLatch, CountDownLatch afterCloseLatch, AtomicInteger written) { + this.asyncContextIsComplete = asyncContextIsComplete; + this.asyncContextIsError = asyncContextIsError; + this.beforeCloseLatch = beforeCloseLatch; + this.afterCloseLatch = afterCloseLatch; + this.written = written; + } + + @Override + protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + AsyncContext actx = req.startAsync(); + actx.setTimeout(Long.MAX_VALUE); + actx.addListener(new AsyncListener() { + + @Override + public void onTimeout(AsyncEvent event) throws IOException { + log.info("onTimeout"); + } + + @Override + public void onStartAsync(AsyncEvent event) throws IOException { + log.info("onStartAsync"); + } + + @Override + public void onError(AsyncEvent event) throws IOException { + log.info("AsyncListener.onError"); + asyncContextIsError.set(true); + } + + @Override + public void onComplete(AsyncEvent event) throws IOException { + log.info("onComplete"); + asyncContextIsComplete.set(true); + } + }); + + // Write until buffer is full + ServletOutputStream out = resp.getOutputStream(); + TestWriteListener03 writeListener = new TestWriteListener03(actx, beforeCloseLatch, afterCloseLatch); + out.setWriteListener(writeListener); + + written.set(writeListener.written); + } + } + + + private static class TestWriteListener03 implements WriteListener { + private final AsyncContext ctx; + private final CountDownLatch beforeCloseLatch; + private final CountDownLatch afterCloseLatch; + int written = 0; + + public TestWriteListener03(AsyncContext ctx, CountDownLatch beforeCloseLatch, CountDownLatch afterCloseLatch) { + this.ctx = ctx; + this.beforeCloseLatch = beforeCloseLatch; + this.afterCloseLatch = afterCloseLatch; + } + + @Override + public void onWritePossible() throws IOException { + if (written == 0) { + // Write until the buffer is full and then close the stream + while (ctx.getResponse().getOutputStream().isReady()) { + ctx.getResponse().getOutputStream().write(DATA, written, CHUNK_SIZE); + written += CHUNK_SIZE; + } + beforeCloseLatch.countDown(); + ctx.getResponse().getOutputStream().close(); + afterCloseLatch.countDown(); + } else { + ctx.complete(); + } + } + + @Override + public void onError(Throwable throwable) { + log.info("WriteListener.onError"); + throwable.printStackTrace(); + } + } } \ No newline at end of file diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml index ed849ea3e8..bb3e009784 100644 --- a/webapps/docs/changelog.xml +++ b/webapps/docs/changelog.xml @@ -144,10 +144,17 @@ servlets. (markt) </fix> <fix> - Implement clarification from Jakarta Servlet project that Servlets - mapped to the context root should be mapped for requests to the + Implement the clarification from the Jakarta Servlet project that + Servlets mapped to the context root should be mapped for requests to the context root with or without the trailing <code>/</code>. (markt) </fix> + <fix> + Implement the clarification from the Jakarta Servlet project that + calling <code>ServletOutputStream.close()</code> on a stream in + non-blocking mode returns immediately with the stream effectively closed + and any data remaining to be written is written in the background by the + container. (markt) + </fix> </changelog> </subsection> <subsection name="Coyote"> --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org