This is an automated email from the ASF dual-hosted git repository. remm pushed a commit to branch 8.5.x in repository https://gitbox.apache.org/repos/asf/tomcat.git
The following commit(s) were added to refs/heads/8.5.x by this push: new 6f354d0 Sync NIO2 IO code with trunk 6f354d0 is described below commit 6f354d040d5f170e51e12548edd46b6017f1465f Author: remm <r...@apache.org> AuthorDate: Thu May 9 16:29:45 2019 +0200 Sync NIO2 IO code with trunk --- .../catalina/security/SecurityClassLoad.java | 6 ++ .../apache/tomcat/util/net/AbstractEndpoint.java | 13 ++++ java/org/apache/tomcat/util/net/Nio2Endpoint.java | 69 ++++++++++--------- .../apache/tomcat/util/net/SecureNio2Channel.java | 12 ++-- .../apache/tomcat/util/net/SocketWrapperBase.java | 80 ++++++++++++++++++---- webapps/docs/changelog.xml | 16 +++++ 6 files changed, 145 insertions(+), 51 deletions(-) diff --git a/java/org/apache/catalina/security/SecurityClassLoad.java b/java/org/apache/catalina/security/SecurityClassLoad.java index 9caf2b8..a76515d 100644 --- a/java/org/apache/catalina/security/SecurityClassLoad.java +++ b/java/org/apache/catalina/security/SecurityClassLoad.java @@ -179,6 +179,12 @@ public final class SecurityClassLoad { loader.loadClass(basePackage + "util.net.NioBlockingSelector$BlockPoller$RunnableAdd"); loader.loadClass(basePackage + "util.net.NioBlockingSelector$BlockPoller$RunnableCancel"); loader.loadClass(basePackage + "util.net.NioBlockingSelector$BlockPoller$RunnableRemove"); + loader.loadClass(basePackage + "util.net.Nio2Endpoint$Nio2SocketWrapper$OperationState"); + loader.loadClass(basePackage + "util.net.Nio2Endpoint$Nio2SocketWrapper$VectoredIOCompletionHandler"); + loader.loadClass(basePackage + "util.net.SocketWrapperBase$BlockingMode"); + loader.loadClass(basePackage + "util.net.SocketWrapperBase$CompletionCheck"); + loader.loadClass(basePackage + "util.net.SocketWrapperBase$CompletionHandlerCall"); + loader.loadClass(basePackage + "util.net.SocketWrapperBase$CompletionState"); // security loader.loadClass(basePackage + "util.security.PrivilegedGetTccl"); loader.loadClass(basePackage + "util.security.PrivilegedSetTccl"); diff --git a/java/org/apache/tomcat/util/net/AbstractEndpoint.java b/java/org/apache/tomcat/util/net/AbstractEndpoint.java index 603bb47..2bc782e 100644 --- a/java/org/apache/tomcat/util/net/AbstractEndpoint.java +++ b/java/org/apache/tomcat/util/net/AbstractEndpoint.java @@ -152,6 +152,11 @@ public abstract class AbstractEndpoint<S> { private static final int MAX_ERROR_DELAY = 1600; + public static long toTimeout(long timeout) { + // Many calls can't do infinite timeout so use Long.MAX_VALUE if timeout is <= 0 + return (timeout > 0) ? timeout : Long.MAX_VALUE; + } + // ----------------------------------------------------------------- Fields /** @@ -740,6 +745,14 @@ public abstract class AbstractEndpoint<S> { public boolean getDaemon() { return daemon; } + /** + * Expose asynchronous IO capability. + */ + private boolean useAsyncIO = true; + public void setUseAsyncIO(boolean useAsyncIO) { this.useAsyncIO = useAsyncIO; } + public boolean getUseAsyncIO() { return useAsyncIO; } + + protected abstract boolean getDeferAccept(); diff --git a/java/org/apache/tomcat/util/net/Nio2Endpoint.java b/java/org/apache/tomcat/util/net/Nio2Endpoint.java index ca94fe0..68c4136 100644 --- a/java/org/apache/tomcat/util/net/Nio2Endpoint.java +++ b/java/org/apache/tomcat/util/net/Nio2Endpoint.java @@ -31,6 +31,8 @@ import java.nio.channels.CompletionHandler; import java.nio.channels.FileChannel; import java.nio.channels.InterruptedByTimeoutException; import java.nio.channels.NetworkChannel; +import java.nio.channels.ReadPendingException; +import java.nio.channels.WritePendingException; import java.nio.file.StandardOpenOption; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -47,6 +49,9 @@ import org.apache.juli.logging.LogFactory; import org.apache.tomcat.util.ExceptionUtils; import org.apache.tomcat.util.collections.SynchronizedStack; import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState; +import org.apache.tomcat.util.net.SocketWrapperBase.BlockingMode; +import org.apache.tomcat.util.net.SocketWrapperBase.CompletionHandlerCall; +import org.apache.tomcat.util.net.SocketWrapperBase.CompletionState; import org.apache.tomcat.util.net.jsse.JSSESupport; /** @@ -519,7 +524,7 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel> { } } } - getSocket().write(buffer, toNio2Timeout(getWriteTimeout()), TimeUnit.MILLISECONDS, attachment, this); + getSocket().write(buffer, toTimeout(getWriteTimeout()), TimeUnit.MILLISECONDS, attachment, this); } @Override @@ -599,11 +604,11 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel> { // Continue writing data using a gathering write ByteBuffer[] array = nonBlockingWriteBuffer.toArray(attachment); getSocket().write(array, 0, array.length, - toNio2Timeout(getWriteTimeout()), TimeUnit.MILLISECONDS, + toTimeout(getWriteTimeout()), TimeUnit.MILLISECONDS, array, gatheringWriteCompletionHandler); } else if (attachment.hasRemaining()) { // Regular write - getSocket().write(attachment, toNio2Timeout(getWriteTimeout()), + getSocket().write(attachment, toTimeout(getWriteTimeout()), TimeUnit.MILLISECONDS, attachment, writeCompletionHandler); } else { // All data has been written @@ -649,7 +654,7 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel> { // Continue writing data using a gathering write ByteBuffer[] array = nonBlockingWriteBuffer.toArray(attachment); getSocket().write(array, 0, array.length, - toNio2Timeout(getWriteTimeout()), TimeUnit.MILLISECONDS, + toTimeout(getWriteTimeout()), TimeUnit.MILLISECONDS, array, gatheringWriteCompletionHandler); } else { // All data has been written @@ -928,7 +933,7 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel> { @Override public boolean hasAsyncIO() { - return true; + return getEndpoint().getUseAsyncIO(); } /** @@ -976,11 +981,11 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel> { return CompletionState.ERROR; } if (timeout == -1) { - timeout = toNio2Timeout(getReadTimeout()); + timeout = toTimeout(getReadTimeout()); } // Disable any regular read notifications caused by registerReadInterest readNotify = true; - if (block != BlockingMode.NON_BLOCK) { + if (block == BlockingMode.BLOCK || block == BlockingMode.SEMI_BLOCK) { try { if (!readPending.tryAcquire(timeout, unit)) { handler.failed(new SocketTimeoutException(), attachment); @@ -992,7 +997,12 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel> { } } else { if (!readPending.tryAcquire()) { - return CompletionState.NOT_DONE; + if (block == BlockingMode.NON_BLOCK) { + return CompletionState.NOT_DONE; + } else { + handler.failed(new ReadPendingException(), attachment); + return CompletionState.ERROR; + } } } OperationState<A> state = new OperationState<>(true, dsts, offset, length, block, @@ -1045,11 +1055,11 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel> { return CompletionState.ERROR; } if (timeout == -1) { - timeout = toNio2Timeout(getWriteTimeout()); + timeout = toTimeout(getWriteTimeout()); } // Disable any regular write notifications caused by registerWriteInterest writeNotify = true; - if (block != BlockingMode.NON_BLOCK) { + if (block == BlockingMode.BLOCK || block == BlockingMode.SEMI_BLOCK) { try { if (!writePending.tryAcquire(timeout, unit)) { handler.failed(new SocketTimeoutException(), attachment); @@ -1061,7 +1071,12 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel> { } } else { if (!writePending.tryAcquire()) { - return CompletionState.NOT_DONE; + if (block == BlockingMode.NON_BLOCK) { + return CompletionState.NOT_DONE; + } else { + handler.failed(new WritePendingException(), attachment); + return CompletionState.ERROR; + } } } if (!socketBufferHandler.isWriteBufferEmpty()) { @@ -1109,15 +1124,11 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel> { boolean complete = true; boolean completion = true; if (state.check != null) { - switch (state.check.callHandler(currentState, state.buffers, state.offset, state.length)) { - case CONTINUE: + CompletionHandlerCall call = state.check.callHandler(currentState, state.buffers, state.offset, state.length); + if (call == CompletionHandlerCall.CONTINUE) { complete = false; - break; - case DONE: - break; - case NONE: + } else if (call == CompletionHandlerCall.NONE) { completion = false; - break; } } if (complete) { @@ -1150,13 +1161,12 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel> { } @Override public void failed(Throwable exc, OperationState<A> state) { - IOException ioe; + IOException ioe = null; if (exc instanceof InterruptedByTimeoutException) { ioe = new SocketTimeoutException(); + exc = ioe; } else if (exc instanceof IOException) { ioe = (IOException) exc; - } else { - ioe = new IOException(exc); } setError(ioe); boolean notify = false; @@ -1167,7 +1177,7 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel> { state.state = Nio2Endpoint.isInline() ? CompletionState.ERROR : CompletionState.DONE; } if (state.handler != null) { - state.handler.failed(ioe, state.attachment); + state.handler.failed(exc, state.attachment); } if (notify) { synchronized (state) { @@ -1220,7 +1230,7 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel> { } } else { Nio2Endpoint.startInline(); - getSocket().read(to, toNio2Timeout(getReadTimeout()), TimeUnit.MILLISECONDS, to, + getSocket().read(to, toTimeout(getReadTimeout()), TimeUnit.MILLISECONDS, to, readCompletionHandler); Nio2Endpoint.endInline(); if (readPending.availablePermits() == 1) { @@ -1361,7 +1371,7 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel> { // Before doing a blocking flush, make sure that any pending non // blocking write has completed. try { - if (writePending.tryAcquire(toNio2Timeout(getWriteTimeout()), TimeUnit.MILLISECONDS)) { + if (writePending.tryAcquire(toTimeout(getWriteTimeout()), TimeUnit.MILLISECONDS)) { writePending.release(); } else { throw new SocketTimeoutException(); @@ -1388,13 +1398,13 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel> { if (!nonBlockingWriteBuffer.isEmpty()) { ByteBuffer[] array = nonBlockingWriteBuffer.toArray(socketBufferHandler.getWriteBuffer()); Nio2Endpoint.startInline(); - getSocket().write(array, 0, array.length, toNio2Timeout(getWriteTimeout()), + getSocket().write(array, 0, array.length, toTimeout(getWriteTimeout()), TimeUnit.MILLISECONDS, array, gatheringWriteCompletionHandler); Nio2Endpoint.endInline(); } else if (socketBufferHandler.getWriteBuffer().hasRemaining()) { // Regular write Nio2Endpoint.startInline(); - getSocket().write(socketBufferHandler.getWriteBuffer(), toNio2Timeout(getWriteTimeout()), + getSocket().write(socketBufferHandler.getWriteBuffer(), toTimeout(getWriteTimeout()), TimeUnit.MILLISECONDS, socketBufferHandler.getWriteBuffer(), writeCompletionHandler); Nio2Endpoint.endInline(); @@ -1557,7 +1567,7 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel> { data.length -= nRead; getSocket().getBufHandler().configureWriteBufferForRead(); Nio2Endpoint.startInline(); - getSocket().write(buffer, toNio2Timeout(getWriteTimeout()), TimeUnit.MILLISECONDS, + getSocket().write(buffer, toTimeout(getWriteTimeout()), TimeUnit.MILLISECONDS, data, sendfileHandler); Nio2Endpoint.endInline(); if (data.doneInline) { @@ -1697,11 +1707,6 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel> { } } - public static long toNio2Timeout(long timeout) { - // NIO2 can't do infinite timeout so use Long.MAX_VALUE if timeout is <= 0 - return (timeout > 0) ? timeout : Long.MAX_VALUE; - } - public static void startInline() { inlineCompletion.set(Boolean.TRUE); } diff --git a/java/org/apache/tomcat/util/net/SecureNio2Channel.java b/java/org/apache/tomcat/util/net/SecureNio2Channel.java index 6eae95d..cb670ae 100644 --- a/java/org/apache/tomcat/util/net/SecureNio2Channel.java +++ b/java/org/apache/tomcat/util/net/SecureNio2Channel.java @@ -256,7 +256,7 @@ public class SecureNio2Channel extends Nio2Channel { return 0; } else { if (async) { - sc.write(netOutBuffer, Nio2Endpoint.toNio2Timeout(timeout), + sc.write(netOutBuffer, AbstractEndpoint.toTimeout(timeout), TimeUnit.MILLISECONDS, socket, handshakeWriteCompletionHandler); } else { try { @@ -294,7 +294,7 @@ public class SecureNio2Channel extends Nio2Channel { if (handshakeStatus != HandshakeStatus.NEED_UNWRAP || netOutBuffer.remaining() > 0) { //should actually return OP_READ if we have NEED_UNWRAP if (async) { - sc.write(netOutBuffer, Nio2Endpoint.toNio2Timeout(timeout), + sc.write(netOutBuffer, AbstractEndpoint.toTimeout(timeout), TimeUnit.MILLISECONDS, socket, handshakeWriteCompletionHandler); } else { try { @@ -326,7 +326,7 @@ public class SecureNio2Channel extends Nio2Channel { } //read more data if (async) { - sc.read(netInBuffer, Nio2Endpoint.toNio2Timeout(timeout), + sc.read(netInBuffer, AbstractEndpoint.toTimeout(timeout), TimeUnit.MILLISECONDS, socket, handshakeReadCompletionHandler); } else { try { @@ -371,7 +371,7 @@ public class SecureNio2Channel extends Nio2Channel { // an optimisation for the typical case so we don't create an // SNIExtractor only to discover there is no data to process if (netInBuffer.position() == 0) { - sc.read(netInBuffer, Nio2Endpoint.toNio2Timeout(endpoint.getConnectionTimeout()), + sc.read(netInBuffer, AbstractEndpoint.toTimeout(endpoint.getConnectionTimeout()), TimeUnit.MILLISECONDS, socket, handshakeReadCompletionHandler); return 1; } @@ -387,7 +387,7 @@ public class SecureNio2Channel extends Nio2Channel { Integer.toString(newLimit))); netInBuffer = ByteBufferUtils.expand(netInBuffer, newLimit); - sc.read(netInBuffer, Nio2Endpoint.toNio2Timeout(endpoint.getConnectionTimeout()), + sc.read(netInBuffer, AbstractEndpoint.toTimeout(endpoint.getConnectionTimeout()), TimeUnit.MILLISECONDS, socket, handshakeReadCompletionHandler); return 1; } @@ -405,7 +405,7 @@ public class SecureNio2Channel extends Nio2Channel { clientRequestedCiphers = extractor.getClientRequestedCiphers(); break; case NEED_READ: - sc.read(netInBuffer, Nio2Endpoint.toNio2Timeout(endpoint.getConnectionTimeout()), + sc.read(netInBuffer, AbstractEndpoint.toTimeout(endpoint.getConnectionTimeout()), TimeUnit.MILLISECONDS, socket, handshakeReadCompletionHandler); return 1; case UNDERFLOW: diff --git a/java/org/apache/tomcat/util/net/SocketWrapperBase.java b/java/org/apache/tomcat/util/net/SocketWrapperBase.java index c3097d6..b3d7e1e 100644 --- a/java/org/apache/tomcat/util/net/SocketWrapperBase.java +++ b/java/org/apache/tomcat/util/net/SocketWrapperBase.java @@ -26,6 +26,10 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import org.apache.juli.logging.Log; import org.apache.juli.logging.LogFactory; +import org.apache.tomcat.util.net.SocketWrapperBase.BlockingMode; +import org.apache.tomcat.util.net.SocketWrapperBase.CompletionCheck; +import org.apache.tomcat.util.net.SocketWrapperBase.CompletionHandlerCall; +import org.apache.tomcat.util.net.SocketWrapperBase.CompletionState; import org.apache.tomcat.util.res.StringManager; public abstract class SocketWrapperBase<E> { @@ -791,7 +795,12 @@ public abstract class SocketWrapperBase<E> { public enum BlockingMode { /** - * The operation will now block. If there are pending operations, + * The operation will not block. If there are pending operations, + * the operation will throw a pending exception. + */ + CLASSIC, + /** + * The operation will not block. If there are pending operations, * the operation will return CompletionState.NOT_DONE. */ NON_BLOCK, @@ -875,7 +884,7 @@ public abstract class SocketWrapperBase<E> { public CompletionHandlerCall callHandler(CompletionState state, ByteBuffer[] buffers, int offset, int length) { for (int i = 0; i < length; i++) { - if (buffers[offset + i].remaining() > 0) { + if (buffers[offset + i].hasRemaining()) { return CompletionHandlerCall.CONTINUE; } } @@ -893,7 +902,7 @@ public abstract class SocketWrapperBase<E> { public CompletionHandlerCall callHandler(CompletionState state, ByteBuffer[] buffers, int offset, int length) { for (int i = 0; i < length; i++) { - if (buffers[offset + i].remaining() > 0) { + if (buffers[offset + i].hasRemaining()) { return CompletionHandlerCall.CONTINUE; } } @@ -916,6 +925,20 @@ public abstract class SocketWrapperBase<E> { }; /** + * This utility CompletionCheck will cause the completion handler + * to be called once the given buffers are full. The completion + * handler will then be called. + */ + public static final CompletionCheck COMPLETE_READ_WITH_COMPLETION = COMPLETE_WRITE_WITH_COMPLETION; + + /** + * This utility CompletionCheck will cause the completion handler + * to be called once the given buffers are full. If the operation + * completes inline, the completion handler will not be called. + */ + public static final CompletionCheck COMPLETE_READ = COMPLETE_WRITE; + + /** * Allows using NIO2 style read/write only for connectors that can * efficiently support it. * @@ -973,12 +996,27 @@ public abstract class SocketWrapperBase<E> { return true; } - @Deprecated - public final <A> CompletionState read(boolean block, long timeout, - TimeUnit unit, A attachment, CompletionCheck check, + /** + * Scatter read. The completion handler will be called once some + * data has been read or an error occurred. The default NIO2 + * behavior is used: the completion handler will be called as soon + * as some data has been read, even if the read has completed inline. + * + * @param timeout timeout duration for the read + * @param unit units for the timeout duration + * @param attachment an object to attach to the I/O operation that will be + * used when calling the completion handler + * @param handler to call when the IO is complete + * @param dsts buffers + * @param <A> The attachment type + * @return the completion state (done, done inline, or still pending) + */ + public final <A> CompletionState read(long timeout, TimeUnit unit, A attachment, CompletionHandler<Long, ? super A> handler, ByteBuffer... dsts) { - return read(block ? BlockingMode.BLOCK : BlockingMode.NON_BLOCK, - timeout, unit, attachment, check, handler, dsts); + if (dsts == null) { + throw new IllegalArgumentException(); + } + return read(dsts, 0, dsts.length, BlockingMode.CLASSIC, timeout, unit, attachment, null, handler); } /** @@ -1038,12 +1076,28 @@ public abstract class SocketWrapperBase<E> { throw new UnsupportedOperationException(); } - @Deprecated - public final <A> CompletionState write(boolean block, long timeout, - TimeUnit unit, A attachment, CompletionCheck check, + /** + * Gather write. The completion handler will be called once some + * data has been written or an error occurred. The default NIO2 + * behavior is used: the completion handler will be called, even + * if the write is incomplete and data remains in the buffers, or + * if the write completed inline. + * + * @param timeout timeout duration for the write + * @param unit units for the timeout duration + * @param attachment an object to attach to the I/O operation that will be + * used when calling the completion handler + * @param handler to call when the IO is complete + * @param srcs buffers + * @param <A> The attachment type + * @return the completion state (done, done inline, or still pending) + */ + public final <A> CompletionState write(long timeout, TimeUnit unit, A attachment, CompletionHandler<Long, ? super A> handler, ByteBuffer... srcs) { - return write(block ? BlockingMode.BLOCK : BlockingMode.NON_BLOCK, - timeout, unit, attachment, check, handler, srcs); + if (srcs == null) { + throw new IllegalArgumentException(); + } + return write(srcs, 0, srcs.length, BlockingMode.CLASSIC, timeout, unit, attachment, null, handler); } /** diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml index b0f6983..e3e1313 100644 --- a/webapps/docs/changelog.xml +++ b/webapps/docs/changelog.xml @@ -52,6 +52,22 @@ </add> </changelog> </subsection> + <subsection name="Coyote"> + <changelog> + <update> + Add additional NIO2 style read and write methods closer to core NIO2, + for possible use with an asynchronous workflow like CompletableFuture. + (remm) + </update> + <fix> + Avoid useless exception wrapping in async IO. (remm) + </fix> + <fix> + <bug>63412</bug>: Security manager failure when using the async IO + API from a webapp. (remm) + </fix> + </changelog> + </subsection> <subsection name="Other"> <changelog> <update>Switch from FindBugs to SpotBugs. (fschumacher)</update> --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org