Author: remm Date: Tue Apr 7 16:43:58 2015 New Revision: 1671886 URL: http://svn.apache.org/r1671886 Log: - Start experimenting with a state wrapper over NIO2 for scatter/gather operations. - This effectively bypasses the processor (the completion handler would get to call it if it wants to). - All in a single block for easy removal if not useful in the end.
Modified: tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java Modified: tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java?rev=1671886&r1=1671885&r2=1671886&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java Tue Apr 7 16:43:58 2015 @@ -31,6 +31,8 @@ import java.nio.channels.AsynchronousSoc import java.nio.channels.ClosedChannelException; import java.nio.channels.CompletionHandler; import java.nio.channels.FileChannel; +import java.nio.channels.ReadPendingException; +import java.nio.channels.WritePendingException; import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.concurrent.ExecutionException; @@ -1041,6 +1043,244 @@ public class Nio2Endpoint extends Abstra } } + // TODO: NIO2 style scatter/gather methods. + // TODO: SecureNio2Channel gather would need to be improved + + /** + * Internal state tracker for scatter/gather operations. + */ + private class OperationState<A> { + private final ByteBuffer[] buffers; + private final int offset; + private final int length; + private final A attachment; + private final long timeout; + private final TimeUnit unit; + private final CompletionCheck check; + private final CompletionHandler<Long, ? super A> handler; + private OperationState(ByteBuffer[] buffers, int offset, int length, + long timeout, TimeUnit unit, A attachment, CompletionCheck check, + CompletionHandler<Long, ? super A> handler) { + this.buffers = buffers; + this.offset = offset; + this.length = length; + this.timeout = timeout; + this.unit = unit; + this.attachment = attachment; + this.check = check; + this.handler = handler; + this.pos = offset; + } + private long nBytes = 0; + private int pos; + private CompletionState state = CompletionState.PENDING; + } + + private class GatherWriteCompletionHandler<A> implements CompletionHandler<Long, OperationState<A>> { + @Override + public void completed(Long nBytes, OperationState<A> state) { + if (nBytes.longValue() < 0) { + failed(new EOFException(), state); + } else { + state.nBytes += nBytes.longValue(); + if (state.pos == state.offset + state.length) { + writePending.release(); + state.state = Nio2Endpoint.isInline() ? CompletionState.INLINE : CompletionState.DONE; + if (state.check == null + || state.check.callHandler(state.state, state.buffers, state.offset, state.length) + == CompletionHandlerCall.DONE) { + state.handler.completed(Long.valueOf(state.nBytes), state.attachment); + } + } else { + if (state.check == null) { + // Call completion handler + writePending.release(); + state.handler.completed(Long.valueOf(state.nBytes), state.attachment); + } else { + boolean inline = Nio2Endpoint.isInline(); + switch (state.check.callHandler(inline ? CompletionState.INLINE : CompletionState.DONE, + state.buffers, state.offset, state.length)) { + case CONTINUE: + getSocket().write(state.buffers, state.offset, state.length, + state.timeout, state.unit, state, this); + break; + case DONE: + writePending.release(); + state.state = inline ? CompletionState.INLINE : CompletionState.DONE; + state.handler.completed(Long.valueOf(state.nBytes), state.attachment); + break; + case NONE: + writePending.release(); + state.state = inline ? CompletionState.INLINE : CompletionState.DONE; + break; + } + } + } + } + } + @Override + public void failed(Throwable exc, OperationState<A> state) { + IOException ioe; + if (exc instanceof IOException) { + ioe = (IOException) exc; + } else { + ioe = new IOException(exc); + } + Nio2SocketWrapper.this.setError(ioe); + writePending.release(); + state.state = Nio2Endpoint.isInline() ? CompletionState.INLINE : CompletionState.DONE; + state.handler.failed(ioe, state.attachment); + } + } + + public enum CompletionState { + /** + * Operation is pending and the completion handler will + * be called later. + */ + PENDING, + /** + * The operation completed inline, and the completion handler + * will not be called unless an error occurred. + */ + INLINE, + /** + * The operation completed, but not inline. + */ + DONE + } + + public enum CompletionHandlerCall { + /** + * Operation should continue, the completion handler shouldn't be + * called. + */ + CONTINUE, + /** + * The operation completed but the completion handler shouldn't be + * called. This is possibly useful if the operation completed + * inline. + */ + NONE, + /** + * The operation is complete, call the completion handler. + */ + DONE + } + + public interface CompletionCheck { + /** + * Return true if enough data has been read or written and the + * handler should be notified. + * @param dsts ByteBuffer[] that has been passed to the + * original IO call + * @param offset that has been passed to the original IO call + * @param length that has been passed to the original IO call + */ + public CompletionHandlerCall callHandler(CompletionState state, ByteBuffer[] buffers, int offset, int length); + } + + /** + * This utility CompletionCheck will cause the write to fully write + * all remaining data. + */ + public static final CompletionCheck COMPLETE_WRITE = new CompletionCheck() { + public CompletionHandlerCall callHandler(CompletionState state, ByteBuffer[] buffers, int offset, int length) { + for (int i = 0; i < offset; i++) { + if (buffers[i].remaining() > 0) { + return CompletionHandlerCall.CONTINUE; + } + } + return (state == CompletionState.DONE) ? CompletionHandlerCall.DONE : CompletionHandlerCall.NONE; + } + }; + + /** + * This utility CompletionCheck will cause the completion handler + * to be called once some data has been read. + */ + public static final CompletionCheck READ_DATA = new CompletionCheck() { + public CompletionHandlerCall callHandler(CompletionState state, ByteBuffer[] buffers, int offset, int length) { + return (state == CompletionState.DONE) ? CompletionHandlerCall.DONE : CompletionHandlerCall.NONE; + } + }; + + /** + * Scatter read. The completion handler will be called once some + * data has been read or an error occurred. If a CompletionCheck + * object has been provided, the completion handler will only be + * called if the callHandler method returned true. If no + * CompletionCheck object has been provided, the completion handler + * will be called. + * @param dsts + * @param offset + * @param length + * @param timeout + * @param unit + * @param attachment + * @param check + * @param handler + * @return + */ + // FIXME: @Override + public <A> CompletionState read(ByteBuffer[] dsts, + int offset, int length, + long timeout, TimeUnit unit, A attachment, + CompletionCheck check, + CompletionHandler<Long, ? super A> handler) { + OperationState<A> state = new OperationState<A>(dsts, offset, length, timeout, unit, attachment, check, handler); + if (readPending.tryAcquire()) { + Nio2Endpoint.startInline(); + // FIXME: Add scatter read to Nio2Channel and ScatterReadCompletionHandler class + //getSocket().read(dsts, offset, length, timeout, unit, state, new ScatterReadCompletionHandler<>()); + Nio2Endpoint.endInline(); + } else { + throw new ReadPendingException(); + } + return state.state; + } + + public boolean isWritePending() { + synchronized (writeCompletionHandler) { + return writePending.availablePermits() == 0; + } + } + + /** + * Gather write. The completion handler will be called once some + * data has been written or an error occurred. If a CompletionCheck + * object has been provided, the completion handler will only be + * called if the callHandler method returned true. If no + * CompletionCheck object has been provided, the completion handler + * will be called. + * @param srcs + * @param offset + * @param length + * @param timeout + * @param unit + * @param attachment + * @param check + * @param handler + * @return + */ + // FIXME: @Override + public <A> CompletionState write(ByteBuffer[] srcs, + int offset, int length, + long timeout, TimeUnit unit, A attachment, + CompletionCheck check, + CompletionHandler<Long, ? super A> handler) { + OperationState<A> state = new OperationState<A>(srcs, offset, length, timeout, unit, attachment, check, handler); + if (writePending.tryAcquire()) { + Nio2Endpoint.startInline(); + getSocket().write(srcs, offset, length, timeout, unit, state, new GatherWriteCompletionHandler<>()); + Nio2Endpoint.endInline(); + } else { + throw new WritePendingException(); + } + return state.state; + } + + // TODO: End NIO2 style scatter/gather methods. /* Callers of this method must: * - have acquired the readPending semaphore --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org