This is an automated email from the ASF dual-hosted git repository. remm pushed a commit to branch 8.5.x in repository https://gitbox.apache.org/repos/asf/tomcat.git
The following commit(s) were added to refs/heads/8.5.x by this push: new eef4b21 Update async IO API from Tomcat 9 eef4b21 is described below commit eef4b218b7f23a6404e8e32f24dded9a0208cc4c Author: remm <r...@apache.org> AuthorDate: Mon Mar 4 00:03:04 2019 +0100 Update async IO API from Tomcat 9 Although not actually used in Tomcat 8.5, it is better (and easy) to keep it up to date. --- java/org/apache/tomcat/util/net/Nio2Endpoint.java | 260 +++++++++++++-------- .../apache/tomcat/util/net/SocketWrapperBase.java | 88 ++++--- webapps/docs/changelog.xml | 3 + 3 files changed, 216 insertions(+), 135 deletions(-) diff --git a/java/org/apache/tomcat/util/net/Nio2Endpoint.java b/java/org/apache/tomcat/util/net/Nio2Endpoint.java index eefb45b..547ed23 100644 --- a/java/org/apache/tomcat/util/net/Nio2Endpoint.java +++ b/java/org/apache/tomcat/util/net/Nio2Endpoint.java @@ -31,8 +31,6 @@ import java.nio.channels.ClosedChannelException; import java.nio.channels.CompletionHandler; import java.nio.channels.FileChannel; import java.nio.channels.NetworkChannel; -import java.nio.channels.ReadPendingException; -import java.nio.channels.WritePendingException; import java.nio.file.StandardOpenOption; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -936,86 +934,166 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel> { * Internal state tracker for scatter/gather operations. */ private static class OperationState<A> { + private final boolean read; private final ByteBuffer[] buffers; private final int offset; private final int length; private final A attachment; private final long timeout; private final TimeUnit unit; + private final BlockingMode block; private final CompletionCheck check; private final CompletionHandler<Long, ? super A> handler; - private OperationState(ByteBuffer[] buffers, int offset, int length, - long timeout, TimeUnit unit, A attachment, CompletionCheck check, - CompletionHandler<Long, ? super A> handler) { + private final Semaphore semaphore; + private OperationState(boolean read, ByteBuffer[] buffers, int offset, int length, + BlockingMode block, long timeout, TimeUnit unit, A attachment, + CompletionCheck check, CompletionHandler<Long, ? super A> handler, + Semaphore semaphore) { + this.read = read; this.buffers = buffers; this.offset = offset; this.length = length; + this.block = block; this.timeout = timeout; this.unit = unit; this.attachment = attachment; this.check = check; this.handler = handler; + this.semaphore = semaphore; } private volatile long nBytes = 0; private volatile CompletionState state = CompletionState.PENDING; } - private class ScatterReadCompletionHandler<A> implements CompletionHandler<Long, OperationState<A>> { @Override - public void completed(Long nBytes, OperationState<A> state) { - if (nBytes.intValue() < 0) { - failed(new EOFException(), state); - } else { - state.nBytes += nBytes.longValue(); - CompletionState currentState = Nio2Endpoint.isInline() ? CompletionState.INLINE : CompletionState.DONE; - boolean complete = true; - boolean completion = true; - if (state.check != null) { - switch (state.check.callHandler(currentState, state.buffers, state.offset, state.length)) { - case CONTINUE: - complete = false; - break; - case DONE: - break; - case NONE: - completion = false; - break; + public <A> CompletionState read(ByteBuffer[] dsts, int offset, int length, + BlockingMode block, long timeout, TimeUnit unit, A attachment, + CompletionCheck check, CompletionHandler<Long, ? super A> handler) { + IOException ioe = getError(); + if (ioe != null) { + handler.failed(ioe, attachment); + return CompletionState.ERROR; + } + if (timeout == -1) { + timeout = toNio2Timeout(getReadTimeout()); } + if (block != BlockingMode.NON_BLOCK) { + try { + if (!readPending.tryAcquire(timeout, unit)) { + handler.failed(new SocketTimeoutException(), attachment); + return CompletionState.ERROR; } - if (complete) { - readPending.release(); - state.state = currentState; - if (completion && state.handler != null) { - state.handler.completed(Long.valueOf(state.nBytes), state.attachment); + } catch (InterruptedException e) { + handler.failed(e, attachment); + return CompletionState.ERROR; + } + } else { + if (!readPending.tryAcquire()) { + return CompletionState.NOT_DONE; + } + } + OperationState<A> state = new OperationState<>(true, dsts, offset, length, block, + timeout, unit, attachment, check, handler, readPending); + VectoredIOCompletionHandler<A> completion = new VectoredIOCompletionHandler<>(); + Nio2Endpoint.startInline(); + long nBytes = 0; + if (!socketBufferHandler.isReadBufferEmpty()) { + // There is still data inside the main read buffer, use it to fill out the destination buffers + synchronized (readCompletionHandler) { + // Note: It is not necessary to put this code in the completion handler + socketBufferHandler.configureReadBufferForRead(); + for (int i = 0; i < length && !socketBufferHandler.isReadBufferEmpty(); i++) { + nBytes += transfer(socketBufferHandler.getReadBuffer(), dsts[offset + i]); + } + } + if (nBytes > 0) { + completion.completed(Long.valueOf(nBytes), state); + } + } + if (nBytes == 0) { + getSocket().read(dsts, offset, length, timeout, unit, state, completion); + } + Nio2Endpoint.endInline(); + if (block == BlockingMode.BLOCK) { + synchronized (state) { + if (state.state == CompletionState.PENDING) { + try { + state.wait(unit.toMillis(timeout)); + if (state.state == CompletionState.PENDING) { + return CompletionState.ERROR; + } + } catch (InterruptedException e) { + handler.failed(new SocketTimeoutException(), attachment); + return CompletionState.ERROR; } - } else { - getSocket().read(state.buffers, state.offset, state.length, - state.timeout, state.unit, state, this); } } } + return state.state; + } + @Override - public void failed(Throwable exc, OperationState<A> state) { - IOException ioe; - if (exc instanceof IOException) { - ioe = (IOException) exc; - } else { - ioe = new IOException(exc); + public <A> CompletionState write(ByteBuffer[] srcs, int offset, int length, + BlockingMode block, long timeout, TimeUnit unit, A attachment, + CompletionCheck check, CompletionHandler<Long, ? super A> handler) { + IOException ioe = getError(); + if (ioe != null) { + handler.failed(ioe, attachment); + return CompletionState.ERROR; + } + if (timeout == -1) { + timeout = toNio2Timeout(getWriteTimeout()); + } + if (block != BlockingMode.NON_BLOCK) { + try { + if (!writePending.tryAcquire(timeout, unit)) { + handler.failed(new SocketTimeoutException(), attachment); + return CompletionState.ERROR; + } + } catch (InterruptedException e) { + handler.failed(e, attachment); + return CompletionState.ERROR; } - setError(ioe); - readPending.release(); - if (exc instanceof AsynchronousCloseException) { - // If already closed, don't call onError and close again - return; + } else { + if (!writePending.tryAcquire()) { + return CompletionState.NOT_DONE; } - state.state = Nio2Endpoint.isInline() ? CompletionState.ERROR : CompletionState.DONE; - if (state.handler != null) { - state.handler.failed(ioe, state.attachment); + } + if (!socketBufferHandler.isWriteBufferEmpty()) { + // First flush the main buffer as needed + try { + doWrite(true); + } catch (IOException e) { + handler.failed(e, attachment); + return CompletionState.ERROR; + } + } + OperationState<A> state = new OperationState<>(false, srcs, offset, length, block, + timeout, unit, attachment, check, handler, writePending); + VectoredIOCompletionHandler<A> completion = new VectoredIOCompletionHandler<>(); + Nio2Endpoint.startInline(); + // It should be less necessary to check the buffer state as it is easy to flush before + getSocket().write(srcs, offset, length, timeout, unit, state, completion); + Nio2Endpoint.endInline(); + if (block == BlockingMode.BLOCK) { + synchronized (state) { + if (state.state == CompletionState.PENDING) { + try { + state.wait(unit.toMillis(timeout)); + if (state.state == CompletionState.PENDING) { + return CompletionState.ERROR; + } + } catch (InterruptedException e) { + handler.failed(new SocketTimeoutException(), attachment); + return CompletionState.ERROR; + } + } } } + return state.state; } - private class GatherWriteCompletionHandler<A> implements CompletionHandler<Long, OperationState<A>> { + private class VectoredIOCompletionHandler<A> implements CompletionHandler<Long, OperationState<A>> { @Override public void completed(Long nBytes, OperationState<A> state) { if (nBytes.longValue() < 0) { @@ -1038,14 +1116,30 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel> { } } if (complete) { - writePending.release(); - state.state = currentState; + boolean notify = false; + state.semaphore.release(); + if (state.block == BlockingMode.BLOCK && currentState != CompletionState.INLINE) { + notify = true; + } else { + state.state = currentState; + } if (completion && state.handler != null) { state.handler.completed(Long.valueOf(state.nBytes), state.attachment); } + if (notify) { + synchronized (state) { + state.state = currentState; + state.notify(); + } + } } else { - getSocket().write(state.buffers, state.offset, state.length, - state.timeout, state.unit, state, this); + if (state.read) { + getSocket().read(state.buffers, state.offset, state.length, + state.timeout, state.unit, state, this); + } else { + getSocket().write(state.buffers, state.offset, state.length, + state.timeout, state.unit, state, this); + } } } } @@ -1058,63 +1152,23 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel> { ioe = new IOException(exc); } setError(ioe); - writePending.release(); - state.state = Nio2Endpoint.isInline() ? CompletionState.ERROR : CompletionState.DONE; - if (state.handler != null) { - state.handler.failed(ioe, state.attachment); - } - } - } - - @Override - public <A> CompletionState read(ByteBuffer[] dsts, int offset, int length, - boolean block, long timeout, TimeUnit unit, A attachment, - CompletionCheck check, CompletionHandler<Long, ? super A> handler) { - OperationState<A> state = new OperationState<>(dsts, offset, length, timeout, unit, attachment, check, handler); - try { - if ((!block && readPending.tryAcquire()) || (block && readPending.tryAcquire(timeout, unit))) { - Nio2Endpoint.startInline(); - getSocket().read(dsts, offset, length, timeout, unit, state, new ScatterReadCompletionHandler<A>()); - Nio2Endpoint.endInline(); + boolean notify = false; + state.semaphore.release(); + if (state.block == BlockingMode.BLOCK) { + notify = true; } else { - throw new ReadPendingException(); + state.state = Nio2Endpoint.isInline() ? CompletionState.ERROR : CompletionState.DONE; } - if (block && state.state == CompletionState.PENDING && readPending.tryAcquire(timeout, unit)) { - readPending.release(); - } - } catch (InterruptedException e) { - handler.failed(e, attachment); - } - return state.state; - } - - @Override - public boolean isWritePending() { - synchronized (writeCompletionHandler) { - return writePending.availablePermits() == 0; - } - } - - @Override - public <A> CompletionState write(ByteBuffer[] srcs, int offset, int length, - boolean block, long timeout, TimeUnit unit, A attachment, - CompletionCheck check, CompletionHandler<Long, ? super A> handler) { - OperationState<A> state = new OperationState<>(srcs, offset, length, timeout, unit, attachment, check, handler); - try { - if ((!block && writePending.tryAcquire()) || (block && writePending.tryAcquire(timeout, unit))) { - Nio2Endpoint.startInline(); - getSocket().write(srcs, offset, length, timeout, unit, state, new GatherWriteCompletionHandler<A>()); - Nio2Endpoint.endInline(); - } else { - throw new WritePendingException(); + if (state.handler != null) { + state.handler.failed(ioe, state.attachment); } - if (block && state.state == CompletionState.PENDING && writePending.tryAcquire(timeout, unit)) { - writePending.release(); + if (notify) { + synchronized (state) { + state.state = Nio2Endpoint.isInline() ? CompletionState.ERROR : CompletionState.DONE; + state.notify(); + } } - } catch (InterruptedException e) { - handler.failed(e, attachment); } - return state.state; } /* Callers of this method must: diff --git a/java/org/apache/tomcat/util/net/SocketWrapperBase.java b/java/org/apache/tomcat/util/net/SocketWrapperBase.java index d8e5d22..4f40afd 100644 --- a/java/org/apache/tomcat/util/net/SocketWrapperBase.java +++ b/java/org/apache/tomcat/util/net/SocketWrapperBase.java @@ -789,12 +789,33 @@ public abstract class SocketWrapperBase<E> { // ------------------------------------------------------- NIO 2 style APIs + public enum BlockingMode { + /** + * The operation will now block. If there are pending operations, + * the operation will return CompletionState.NOT_DONE. + */ + NON_BLOCK, + /** + * The operation will block until pending operations are completed, but + * will not block after performing it. + */ + SEMI_BLOCK, + /** + * The operation will block until completed. + */ + BLOCK + } + public enum CompletionState { /** * Operation is still pending. */ PENDING, /** + * Operation was pending and non blocking. + */ + NOT_DONE, + /** * The operation completed inline. */ INLINE, @@ -853,8 +874,8 @@ public abstract class SocketWrapperBase<E> { @Override public CompletionHandlerCall callHandler(CompletionState state, ByteBuffer[] buffers, int offset, int length) { - for (int i = 0; i < offset; i++) { - if (buffers[i].remaining() > 0) { + for (int i = 0; i < length; i++) { + if (buffers[offset + i].remaining() > 0) { return CompletionHandlerCall.CONTINUE; } } @@ -864,6 +885,23 @@ public abstract class SocketWrapperBase<E> { }; /** + * This utility CompletionCheck will cause the write to fully write + * all remaining data. The completion handler will then be called. + */ + public static final CompletionCheck COMPLETE_WRITE_WITH_COMPLETION = new CompletionCheck() { + @Override + public CompletionHandlerCall callHandler(CompletionState state, ByteBuffer[] buffers, + int offset, int length) { + for (int i = 0; i < length; i++) { + if (buffers[offset + i].remaining() > 0) { + return CompletionHandlerCall.CONTINUE; + } + } + return CompletionHandlerCall.DONE; + } + }; + + /** * This utility CompletionCheck will cause the completion handler * to be called once some data has been read. If the operation * completes inline, the completion handler will not be called. @@ -942,11 +980,7 @@ public abstract class SocketWrapperBase<E> { * behavior is used: the completion handler will be called as soon * as some data has been read, even if the read has completed inline. * - * @param block true to block until any pending read is done, if the - * timeout occurs and a read is still pending, a - * ReadPendingException will be thrown; false to - * not block but any pending read operation will cause - * a ReadPendingException + * @param block is the blocking mode that will be used for this operation * @param timeout timeout duration for the read * @param unit units for the timeout duration * @param attachment an object to attach to the I/O operation that will be @@ -957,8 +991,9 @@ public abstract class SocketWrapperBase<E> { * @param <A> The attachment type * @return the completion state (done, done inline, or still pending) */ - public final <A> CompletionState read(boolean block, long timeout, TimeUnit unit, A attachment, - CompletionCheck check, CompletionHandler<Long, ? super A> handler, ByteBuffer... dsts) { + public final <A> CompletionState read(BlockingMode block, long timeout, + TimeUnit unit, A attachment, CompletionCheck check, + CompletionHandler<Long, ? super A> handler, ByteBuffer... dsts) { if (dsts == null) { throw new IllegalArgumentException(); } @@ -977,11 +1012,7 @@ public abstract class SocketWrapperBase<E> { * @param dsts buffers * @param offset in the buffer array * @param length in the buffer array - * @param block true to block until any pending read is done, if the - * timeout occurs and a read is still pending, a - * ReadPendingException will be thrown; false to - * not block but any pending read operation will cause - * a ReadPendingException + * @param block is the blocking mode that will be used for this operation * @param timeout timeout duration for the read * @param unit units for the timeout duration * @param attachment an object to attach to the I/O operation that will be @@ -991,9 +1022,9 @@ public abstract class SocketWrapperBase<E> { * @param <A> The attachment type * @return the completion state (done, done inline, or still pending) */ - public <A> CompletionState read(ByteBuffer[] dsts, int offset, int length, boolean block, - long timeout, TimeUnit unit, A attachment, CompletionCheck check, - CompletionHandler<Long, ? super A> handler) { + public <A> CompletionState read(ByteBuffer[] dsts, int offset, int length, + BlockingMode block, long timeout, TimeUnit unit, A attachment, + CompletionCheck check, CompletionHandler<Long, ? super A> handler) { throw new UnsupportedOperationException(); } @@ -1007,11 +1038,7 @@ public abstract class SocketWrapperBase<E> { * if the write is incomplete and data remains in the buffers, or * if the write completed inline. * - * @param block true to block until any pending write is done, if the - * timeout occurs and a write is still pending, a - * WritePendingException will be thrown; false to - * not block but any pending write operation will cause - * a WritePendingException + * @param block is the blocking mode that will be used for this operation * @param timeout timeout duration for the write * @param unit units for the timeout duration * @param attachment an object to attach to the I/O operation that will be @@ -1022,8 +1049,9 @@ public abstract class SocketWrapperBase<E> { * @param <A> The attachment type * @return the completion state (done, done inline, or still pending) */ - public final <A> CompletionState write(boolean block, long timeout, TimeUnit unit, A attachment, - CompletionCheck check, CompletionHandler<Long, ? super A> handler, ByteBuffer... srcs) { + public final <A> CompletionState write(BlockingMode block, long timeout, + TimeUnit unit, A attachment, CompletionCheck check, + CompletionHandler<Long, ? super A> handler, ByteBuffer... srcs) { if (srcs == null) { throw new IllegalArgumentException(); } @@ -1043,11 +1071,7 @@ public abstract class SocketWrapperBase<E> { * @param srcs buffers * @param offset in the buffer array * @param length in the buffer array - * @param block true to block until any pending write is done, if the - * timeout occurs and a write is still pending, a - * WritePendingException will be thrown; false to - * not block but any pending write operation will cause - * a WritePendingException + * @param block is the blocking mode that will be used for this operation * @param timeout timeout duration for the write * @param unit units for the timeout duration * @param attachment an object to attach to the I/O operation that will be @@ -1057,9 +1081,9 @@ public abstract class SocketWrapperBase<E> { * @param <A> The attachment type * @return the completion state (done, done inline, or still pending) */ - public <A> CompletionState write(ByteBuffer[] srcs, int offset, int length, boolean block, - long timeout, TimeUnit unit, A attachment, CompletionCheck check, - CompletionHandler<Long, ? super A> handler) { + public <A> CompletionState write(ByteBuffer[] srcs, int offset, int length, + BlockingMode block, long timeout, TimeUnit unit, A attachment, + CompletionCheck check, CompletionHandler<Long, ? super A> handler) { throw new UnsupportedOperationException(); } diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml index 38ced8e..e620ee1 100644 --- a/webapps/docs/changelog.xml +++ b/webapps/docs/changelog.xml @@ -135,6 +135,9 @@ <a href="https://bugs.openjdk.java.net/browse/JDK-8157404">JRE KeyStore loading bug</a>. (markt) </add> + <update> + Sync with NIO2 async API from Tomcat 9 branch. (remm) + </update> </changelog> </subsection> <subsection name="WebSocket"> --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org