This is an automated email from the ASF dual-hosted git repository. remm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/tomcat.git
The following commit(s) were added to refs/heads/master by this push: new 0b7f845 Avoid blocking write of internal buffer 0b7f845 is described below commit 0b7f845e191e6e88fe364190e4adfa6900d8e468 Author: remm <r...@apache.org> AuthorDate: Tue May 14 16:20:54 2019 +0200 Avoid blocking write of internal buffer This introduces some "useless" code, but this is to align with NIO and APR code before moving code into SocketWrapperBase. There are a couple of slightly risky changes (the async write of the write buffer, and delaying setting the read/writeNotify flags until after getting the semaphores) that could cause CI instability again (as usual the testsuite passes for me ...). --- java/org/apache/tomcat/util/net/Nio2Endpoint.java | 215 +++++++++++----------- webapps/docs/changelog.xml | 3 +- 2 files changed, 107 insertions(+), 111 deletions(-) diff --git a/java/org/apache/tomcat/util/net/Nio2Endpoint.java b/java/org/apache/tomcat/util/net/Nio2Endpoint.java index b56cbad..8f2de8d 100644 --- a/java/org/apache/tomcat/util/net/Nio2Endpoint.java +++ b/java/org/apache/tomcat/util/net/Nio2Endpoint.java @@ -37,6 +37,7 @@ import java.nio.file.StandardOpenOption; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -953,22 +954,23 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS /** * Internal state tracker for scatter/gather operations. */ - private static class OperationState<A> { - private final boolean read; - private final ByteBuffer[] buffers; - private final int offset; - private final int length; - private final A attachment; - private final long timeout; - private final TimeUnit unit; - private final BlockingMode block; - private final CompletionCheck check; - private final CompletionHandler<Long, ? super A> handler; - private final Semaphore semaphore; - private OperationState(boolean read, ByteBuffer[] buffers, int offset, int length, + protected class OperationState<A> implements Runnable { + protected final boolean read; + protected final ByteBuffer[] buffers; + protected final int offset; + protected final int length; + protected final A attachment; + protected final long timeout; + protected final TimeUnit unit; + protected final BlockingMode block; + protected final CompletionCheck check; + protected final CompletionHandler<Long, ? super A> handler; + protected final Semaphore semaphore; + protected final VectoredIOCompletionHandler<A> completion; + protected OperationState(boolean read, ByteBuffer[] buffers, int offset, int length, BlockingMode block, long timeout, TimeUnit unit, A attachment, CompletionCheck check, CompletionHandler<Long, ? super A> handler, - Semaphore semaphore) { + Semaphore semaphore, VectoredIOCompletionHandler<A> completion) { this.read = read; this.buffers = buffers; this.offset = offset; @@ -980,102 +982,115 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS this.check = check; this.handler = handler; this.semaphore = semaphore; + this.completion = completion; } - private volatile long nBytes = 0; - private volatile CompletionState state = CompletionState.PENDING; - } + protected volatile long nBytes = 0; + protected volatile CompletionState state = CompletionState.PENDING; - @Override - public <A> CompletionState read(ByteBuffer[] dsts, int offset, int length, - BlockingMode block, long timeout, TimeUnit unit, A attachment, - CompletionCheck check, CompletionHandler<Long, ? super A> handler) { - IOException ioe = getError(); - if (ioe != null) { - handler.failed(ioe, attachment); - return CompletionState.ERROR; - } - if (timeout == -1) { - timeout = toTimeout(getReadTimeout()); + public boolean isInline() { + return Nio2Endpoint.isInline(); } - // Disable any regular read notifications caused by registerReadInterest - readNotify = true; - if (block == BlockingMode.BLOCK || block == BlockingMode.SEMI_BLOCK) { + + public boolean process() { try { - if (!readPending.tryAcquire(timeout, unit)) { - handler.failed(new SocketTimeoutException(), attachment); - return CompletionState.ERROR; - } - } catch (InterruptedException e) { - handler.failed(e, attachment); - return CompletionState.ERROR; - } - } else { - if (!readPending.tryAcquire()) { - if (block == BlockingMode.NON_BLOCK) { - return CompletionState.NOT_DONE; - } else { - handler.failed(new ReadPendingException(), attachment); - return CompletionState.ERROR; - } + getEndpoint().getExecutor().execute(this); + } catch (RejectedExecutionException ree) { + log.warn(sm.getString("endpoint.executor.fail", Nio2SocketWrapper.this) , ree); + return false; + } catch (Throwable t) { + ExceptionUtils.handleThrowable(t); + // This means we got an OOM or similar creating a thread, or that + // the pool and its queue are full + log.error(sm.getString("endpoint.process.fail"), t); + return false; } + return true; } - OperationState<A> state = new OperationState<>(true, dsts, offset, length, block, - timeout, unit, attachment, check, handler, readPending); - VectoredIOCompletionHandler<A> completion = new VectoredIOCompletionHandler<>(); - Nio2Endpoint.startInline(); - long nBytes = 0; - if (!socketBufferHandler.isReadBufferEmpty()) { - // There is still data inside the main read buffer, use it to fill out the destination buffers - synchronized (readCompletionHandler) { - // Note: It is not necessary to put this code in the completion handler - socketBufferHandler.configureReadBufferForRead(); - for (int i = 0; i < length && !socketBufferHandler.isReadBufferEmpty(); i++) { - nBytes += transfer(socketBufferHandler.getReadBuffer(), dsts[offset + i]); - } - } - if (nBytes > 0) { - completion.completed(Long.valueOf(nBytes), state); + + public void start() { + if (read) { + // Disable any regular read notifications caused by registerReadInterest + readNotify = true; + } else { + // Disable any regular write notifications caused by registerWriteInterest + writeNotify = true; } + Nio2Endpoint.startInline(); + run(); + Nio2Endpoint.endInline(); } - if (nBytes == 0) { - getSocket().read(dsts, offset, length, timeout, unit, state, completion); - } - Nio2Endpoint.endInline(); - if (block == BlockingMode.BLOCK) { - synchronized (state) { - if (state.state == CompletionState.PENDING) { - try { - state.wait(unit.toMillis(timeout)); - if (state.state == CompletionState.PENDING) { - return CompletionState.ERROR; + + @Override + public void run() { + if (read) { + long nBytes = 0; + if (!socketBufferHandler.isReadBufferEmpty()) { + // There is still data inside the main read buffer, use it to fill out the destination buffers + synchronized (readCompletionHandler) { + // Note: It is not necessary to put this code in the completion handler + socketBufferHandler.configureReadBufferForRead(); + for (int i = 0; i < length && !socketBufferHandler.isReadBufferEmpty(); i++) { + nBytes += transfer(socketBufferHandler.getReadBuffer(), buffers[offset + i]); } - } catch (InterruptedException e) { - handler.failed(new SocketTimeoutException(), attachment); - return CompletionState.ERROR; } + if (nBytes > 0) { + completion.completed(Long.valueOf(nBytes), this); + } + } + if (nBytes == 0) { + getSocket().read(buffers, offset, length, timeout, unit, this, completion); } + } else { + if (!socketBufferHandler.isWriteBufferEmpty()) { + // First flush the main buffer as needed + socketBufferHandler.configureWriteBufferForRead(); + getSocket().write(socketBufferHandler.getWriteBuffer(), null, new CompletionHandler<Integer, Void>() { + @Override + public void completed(Integer result, Void attachment) { + run(); + } + @Override + public void failed(Throwable exc, Void attachment) { + handler.failed(exc, OperationState.this.attachment); + } + }); + return; + } + // It should be less necessary to check the buffer state as it is easy to flush before + getSocket().write(buffers, offset, length, timeout, unit, this, completion); } } - return state.state; + } + + @Override + public <A> CompletionState read(ByteBuffer[] dsts, int offset, int length, + BlockingMode block, long timeout, TimeUnit unit, A attachment, + CompletionCheck check, CompletionHandler<Long, ? super A> handler) { + return readOrWrite(true, dsts, offset, length, block, timeout, unit, attachment, check, handler); } @Override public <A> CompletionState write(ByteBuffer[] srcs, int offset, int length, BlockingMode block, long timeout, TimeUnit unit, A attachment, CompletionCheck check, CompletionHandler<Long, ? super A> handler) { + return readOrWrite(false, srcs, offset, length, block, timeout, unit, attachment, check, handler); + } + + private <A> CompletionState readOrWrite(boolean read, + ByteBuffer[] buffers, int offset, int length, + BlockingMode block, long timeout, TimeUnit unit, A attachment, + CompletionCheck check, CompletionHandler<Long, ? super A> handler) { IOException ioe = getError(); if (ioe != null) { handler.failed(ioe, attachment); return CompletionState.ERROR; } if (timeout == -1) { - timeout = toTimeout(getWriteTimeout()); + timeout = toTimeout(read ? getReadTimeout() : getWriteTimeout()); } - // Disable any regular write notifications caused by registerWriteInterest - writeNotify = true; if (block == BlockingMode.BLOCK || block == BlockingMode.SEMI_BLOCK) { try { - if (!writePending.tryAcquire(timeout, unit)) { + if (read ? !readPending.tryAcquire(timeout, unit) : !writePending.tryAcquire(timeout, unit)) { handler.failed(new SocketTimeoutException(), attachment); return CompletionState.ERROR; } @@ -1084,31 +1099,19 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS return CompletionState.ERROR; } } else { - if (!writePending.tryAcquire()) { + if (read ? !readPending.tryAcquire() : !writePending.tryAcquire()) { if (block == BlockingMode.NON_BLOCK) { return CompletionState.NOT_DONE; } else { - handler.failed(new WritePendingException(), attachment); + handler.failed(read ? new ReadPendingException() : new WritePendingException(), attachment); return CompletionState.ERROR; } } } - if (!socketBufferHandler.isWriteBufferEmpty()) { - // First flush the main buffer as needed - try { - doWrite(true); - } catch (IOException e) { - handler.failed(e, attachment); - return CompletionState.ERROR; - } - } - OperationState<A> state = new OperationState<>(false, srcs, offset, length, block, - timeout, unit, attachment, check, handler, writePending); VectoredIOCompletionHandler<A> completion = new VectoredIOCompletionHandler<>(); - Nio2Endpoint.startInline(); - // It should be less necessary to check the buffer state as it is easy to flush before - getSocket().write(srcs, offset, length, timeout, unit, state, completion); - Nio2Endpoint.endInline(); + OperationState<A> state = new OperationState<>(read, buffers, offset, length, block, timeout, unit, + attachment, check, handler, read ? readPending : writePending, completion); + state.start(); if (block == BlockingMode.BLOCK) { synchronized (state) { if (state.state == CompletionState.PENDING) { @@ -1134,7 +1137,7 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS failed(new EOFException(), state); } else { state.nBytes += nBytes.longValue(); - CompletionState currentState = Nio2Endpoint.isInline() ? CompletionState.INLINE : CompletionState.DONE; + CompletionState currentState = state.isInline() ? CompletionState.INLINE : CompletionState.DONE; boolean complete = true; boolean completion = true; if (state.check != null) { @@ -1163,13 +1166,7 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS } } } else { - if (state.read) { - getSocket().read(state.buffers, state.offset, state.length, - state.timeout, state.unit, state, this); - } else { - getSocket().write(state.buffers, state.offset, state.length, - state.timeout, state.unit, state, this); - } + state.run(); } } } @@ -1188,14 +1185,14 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS if (state.block == BlockingMode.BLOCK) { notify = true; } else { - state.state = Nio2Endpoint.isInline() ? CompletionState.ERROR : CompletionState.DONE; + state.state = state.isInline() ? CompletionState.ERROR : CompletionState.DONE; } if (state.handler != null) { state.handler.failed(exc, state.attachment); } if (notify) { synchronized (state) { - state.state = Nio2Endpoint.isInline() ? CompletionState.ERROR : CompletionState.DONE; + state.state = state.isInline() ? CompletionState.ERROR : CompletionState.DONE; state.notify(); } } diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml index 4f83bb2..1bbaa2f 100644 --- a/webapps/docs/changelog.xml +++ b/webapps/docs/changelog.xml @@ -106,8 +106,7 @@ default due to low performance. (remm) </add> <fix> - Avoid blocking write of internal buffer for NIO when using async IO. - (remm) + Avoid blocking write of internal buffer when using async IO. (remm) </fix> </changelog> </subsection> --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org