Author: remm Date: Tue Mar 28 08:15:05 2017 New Revision: 1789065 URL: http://svn.apache.org/viewvc?rev=1789065&view=rev Log: Better blocking for async IO.
Modified: tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java Modified: tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java?rev=1789065&r1=1789064&r2=1789065&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java Tue Mar 28 08:15:05 2017 @@ -852,14 +852,16 @@ public class Nio2Endpoint extends Abstra private final A attachment; private final long timeout; private final TimeUnit unit; + private final BlockingMode block; private final CompletionCheck check; private final CompletionHandler<Long, ? super A> handler; private OperationState(ByteBuffer[] buffers, int offset, int length, - long timeout, TimeUnit unit, A attachment, CompletionCheck check, - CompletionHandler<Long, ? super A> handler) { + BlockingMode block, long timeout, TimeUnit unit, A attachment, + CompletionCheck check, CompletionHandler<Long, ? super A> handler) { this.buffers = buffers; this.offset = offset; this.length = length; + this.block = block; this.timeout = timeout; this.unit = unit; this.attachment = attachment; @@ -894,7 +896,14 @@ public class Nio2Endpoint extends Abstra } if (complete) { readPending.release(); - state.state = currentState; + if (state.block == BlockingMode.BLOCK && currentState != CompletionState.INLINE) { + synchronized (this) { + state.state = currentState; + notify(); + } + } else { + state.state = currentState; + } if (completion && state.handler != null) { state.handler.completed(Long.valueOf(state.nBytes), state.attachment); } @@ -914,11 +923,18 @@ public class Nio2Endpoint extends Abstra } setError(ioe); readPending.release(); + if (state.block == BlockingMode.BLOCK) { + synchronized (this) { + state.state = Nio2Endpoint.isInline() ? CompletionState.ERROR : CompletionState.DONE; + notify(); + } + } else { + state.state = Nio2Endpoint.isInline() ? CompletionState.ERROR : CompletionState.DONE; + } if (exc instanceof AsynchronousCloseException) { // If already closed, don't call onError and close again return; } - state.state = Nio2Endpoint.isInline() ? CompletionState.ERROR : CompletionState.DONE; if (state.handler != null) { state.handler.failed(ioe, state.attachment); } @@ -949,7 +965,14 @@ public class Nio2Endpoint extends Abstra } if (complete) { writePending.release(); - state.state = currentState; + if (state.block == BlockingMode.BLOCK && currentState != CompletionState.INLINE) { + synchronized (this) { + state.state = currentState; + notify(); + } + } else { + state.state = currentState; + } if (completion && state.handler != null) { state.handler.completed(Long.valueOf(state.nBytes), state.attachment); } @@ -969,7 +992,14 @@ public class Nio2Endpoint extends Abstra } setError(ioe); writePending.release(); - state.state = Nio2Endpoint.isInline() ? CompletionState.ERROR : CompletionState.DONE; + if (state.block == BlockingMode.BLOCK) { + synchronized (this) { + state.state = Nio2Endpoint.isInline() ? CompletionState.ERROR : CompletionState.DONE; + notify(); + } + } else { + state.state = Nio2Endpoint.isInline() ? CompletionState.ERROR : CompletionState.DONE; + } if (state.handler != null) { state.handler.failed(ioe, state.attachment); } @@ -995,14 +1025,25 @@ public class Nio2Endpoint extends Abstra return CompletionState.NOT_DONE; } } - OperationState<A> state = new OperationState<>(dsts, offset, length, timeout, unit, attachment, check, handler); + OperationState<A> state = new OperationState<>(dsts, offset, length, block, timeout, unit, attachment, check, handler); + ScatterReadCompletionHandler<A> completion = new ScatterReadCompletionHandler<>(); Nio2Endpoint.startInline(); - getSocket().read(dsts, offset, length, timeout, unit, state, new ScatterReadCompletionHandler<>()); + getSocket().read(dsts, offset, length, timeout, unit, state, completion); Nio2Endpoint.endInline(); - if (block == BlockingMode.BLOCK && state.state == CompletionState.PENDING) { - if (!awaitReadComplete(timeout, unit)) { - handler.failed(new SocketTimeoutException(), attachment); - return CompletionState.ERROR; + if (block == BlockingMode.BLOCK) { + synchronized (completion) { + if (state.state == CompletionState.PENDING) { + try { + completion.wait(unit.toMillis(timeout)); + if (state.state == CompletionState.PENDING) { + handler.failed(new SocketTimeoutException(), attachment); + return CompletionState.ERROR; + } + } catch (InterruptedException e) { + handler.failed(new SocketTimeoutException(), attachment); + return CompletionState.ERROR; + } + } } } return state.state; @@ -1016,9 +1057,14 @@ public class Nio2Endpoint extends Abstra } @Override - public <A> CompletionState write(ByteBuffer[] srcs, int offset, int length, + public <A> CompletionState write(ByteBuffer[] srcs, int offset, int length, BlockingMode block, long timeout, TimeUnit unit, A attachment, CompletionCheck check, CompletionHandler<Long, ? super A> handler) { + IOException ioe = getError(); + if (ioe != null) { + handler.failed(ioe, attachment); + return CompletionState.ERROR; + } if (block != BlockingMode.NON_BLOCK) { try { if (!writePending.tryAcquire(timeout, unit)) { @@ -1034,14 +1080,25 @@ public class Nio2Endpoint extends Abstra return CompletionState.NOT_DONE; } } - OperationState<A> state = new OperationState<>(srcs, offset, length, timeout, unit, attachment, check, handler); + OperationState<A> state = new OperationState<>(srcs, offset, length, block, timeout, unit, attachment, check, handler); + GatherWriteCompletionHandler<A> completion = new GatherWriteCompletionHandler<>(); Nio2Endpoint.startInline(); - getSocket().write(srcs, offset, length, timeout, unit, state, new GatherWriteCompletionHandler<>()); + getSocket().write(srcs, offset, length, timeout, unit, state, completion); Nio2Endpoint.endInline(); - if (block == BlockingMode.BLOCK && state.state == CompletionState.PENDING) { - if (!awaitWriteComplete(timeout, unit)) { - handler.failed(new SocketTimeoutException(), attachment); - return CompletionState.ERROR; + if (block == BlockingMode.BLOCK) { + synchronized (completion) { + if (state.state == CompletionState.PENDING) { + try { + completion.wait(unit.toMillis(timeout)); + if (state.state == CompletionState.PENDING) { + handler.failed(new SocketTimeoutException(), attachment); + return CompletionState.ERROR; + } + } catch (InterruptedException e) { + handler.failed(new SocketTimeoutException(), attachment); + return CompletionState.ERROR; + } + } } } return state.state; @@ -1282,11 +1339,13 @@ public class Nio2Endpoint extends Abstra try { if (readPending.tryAcquire(timeout, unit)) { readPending.release(); + return true; + } else { + return false; } } catch (InterruptedException e) { return false; } - return true; } @@ -1295,11 +1354,13 @@ public class Nio2Endpoint extends Abstra try { if (writePending.tryAcquire(timeout, unit)) { writePending.release(); + return true; + } else { + return false; } } catch (InterruptedException e) { return false; } - return true; } /* --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org