Author: remm
Date: Tue Apr  7 16:43:58 2015
New Revision: 1671886

URL: http://svn.apache.org/r1671886
Log:
- Start experimenting with a state wrapper over NIO2 for scatter/gather 
operations.
- This effectively bypasses the processor (the completion handler would get to 
call it if it wants to).
- All in a single block for easy removal if not useful in the end.
Modified:
    tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java?rev=1671886&r1=1671885&r2=1671886&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java Tue Apr  7 
16:43:58 2015
@@ -31,6 +31,8 @@ import java.nio.channels.AsynchronousSoc
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.CompletionHandler;
 import java.nio.channels.FileChannel;
+import java.nio.channels.ReadPendingException;
+import java.nio.channels.WritePendingException;
 import java.nio.file.StandardOpenOption;
 import java.util.ArrayList;
 import java.util.concurrent.ExecutionException;
@@ -1041,6 +1043,244 @@ public class Nio2Endpoint extends Abstra
             }
         }
 
+        // TODO: NIO2 style scatter/gather methods.
+        // TODO: SecureNio2Channel gather would need to be improved
+
+        /**
+         * Internal state tracker for scatter/gather operations.
+         */
+        private class OperationState<A> {
+            private final ByteBuffer[] buffers;
+            private final int offset;
+            private final int length;
+            private final A attachment;
+            private final long timeout;
+            private final TimeUnit unit;
+            private final CompletionCheck check;
+            private final CompletionHandler<Long, ? super A> handler;
+            private OperationState(ByteBuffer[] buffers, int offset, int 
length,
+                    long timeout, TimeUnit unit, A attachment, CompletionCheck 
check,
+                    CompletionHandler<Long, ? super A> handler) {
+                this.buffers = buffers;
+                this.offset = offset;
+                this.length = length;
+                this.timeout = timeout;
+                this.unit = unit;
+                this.attachment = attachment;
+                this.check = check;
+                this.handler = handler;
+                this.pos = offset;
+            }
+            private long nBytes = 0;
+            private int pos;
+            private CompletionState state = CompletionState.PENDING;
+        }
+
+        private class GatherWriteCompletionHandler<A> implements 
CompletionHandler<Long, OperationState<A>> {
+            @Override
+            public void completed(Long nBytes, OperationState<A> state) {
+                if (nBytes.longValue() < 0) {
+                    failed(new EOFException(), state);
+                } else {
+                    state.nBytes += nBytes.longValue();
+                    if (state.pos == state.offset + state.length) {
+                        writePending.release();
+                        state.state = Nio2Endpoint.isInline() ? 
CompletionState.INLINE : CompletionState.DONE;
+                        if (state.check == null
+                                || state.check.callHandler(state.state, 
state.buffers, state.offset, state.length)
+                                    == CompletionHandlerCall.DONE) {
+                            
state.handler.completed(Long.valueOf(state.nBytes), state.attachment);
+                        }
+                    } else {
+                        if (state.check == null) {
+                            // Call completion handler
+                            writePending.release();
+                            
state.handler.completed(Long.valueOf(state.nBytes), state.attachment);
+                        } else {
+                            boolean inline = Nio2Endpoint.isInline();
+                            switch (state.check.callHandler(inline ? 
CompletionState.INLINE : CompletionState.DONE,
+                                        state.buffers, state.offset, 
state.length)) {
+                            case CONTINUE:
+                                getSocket().write(state.buffers, state.offset, 
state.length,
+                                        state.timeout, state.unit, state, 
this);
+                                break;
+                            case DONE:
+                                writePending.release();
+                                state.state = inline ? CompletionState.INLINE 
: CompletionState.DONE;
+                                
state.handler.completed(Long.valueOf(state.nBytes), state.attachment);
+                                break;
+                            case NONE:
+                                writePending.release();
+                                state.state = inline ? CompletionState.INLINE 
: CompletionState.DONE;
+                                break;
+                            }
+                        }
+                    }
+                }
+            }
+            @Override
+            public void failed(Throwable exc, OperationState<A> state) {
+                IOException ioe;
+                if (exc instanceof IOException) {
+                    ioe = (IOException) exc;
+                } else {
+                    ioe = new IOException(exc);
+                }
+                Nio2SocketWrapper.this.setError(ioe);
+                writePending.release();
+                state.state = Nio2Endpoint.isInline() ? CompletionState.INLINE 
: CompletionState.DONE;
+                state.handler.failed(ioe, state.attachment);
+            }
+        }
+
+        public enum CompletionState {
+            /**
+             * Operation is pending and the completion handler will
+             * be called later.
+             */
+            PENDING,
+            /**
+             * The operation completed inline, and the completion handler
+             * will not be called unless an error occurred.
+             */
+            INLINE,
+            /**
+             * The operation completed, but not inline.
+             */
+            DONE
+        }
+
+        public enum CompletionHandlerCall {
+            /**
+             * Operation should continue, the completion handler shouldn't be
+             * called.
+             */
+            CONTINUE,
+            /**
+             * The operation completed but the completion handler shouldn't be
+             * called. This is possibly useful if the operation completed
+             * inline.
+             */
+            NONE,
+            /**
+             * The operation is complete, call the completion handler.
+             */
+            DONE
+        }
+
+        public interface CompletionCheck {
+            /**
+             * Return true if enough data has been read or written and the
+             * handler should be notified.
+             * @param dsts ByteBuffer[] that has been passed to the
+             *        original IO call
+             * @param offset that has been passed to the original IO call
+             * @param length that has been passed to the original IO call
+             */
+            public CompletionHandlerCall callHandler(CompletionState state, 
ByteBuffer[] buffers, int offset, int length);
+        }
+
+        /**
+         * This utility CompletionCheck will cause the write to fully write
+         * all remaining data.
+         */
+        public static final CompletionCheck COMPLETE_WRITE = new 
CompletionCheck() {
+            public CompletionHandlerCall callHandler(CompletionState state, 
ByteBuffer[] buffers, int offset, int length) {
+                for (int i = 0; i < offset; i++) {
+                    if (buffers[i].remaining() > 0) {
+                        return CompletionHandlerCall.CONTINUE;
+                    }
+                }
+                return (state == CompletionState.DONE) ? 
CompletionHandlerCall.DONE : CompletionHandlerCall.NONE;
+            }
+        };
+
+        /**
+         * This utility CompletionCheck will cause the completion handler
+         * to be called once some data has been read.
+         */
+        public static final CompletionCheck READ_DATA = new CompletionCheck() {
+            public CompletionHandlerCall callHandler(CompletionState state, 
ByteBuffer[] buffers, int offset, int length) {
+                return (state == CompletionState.DONE) ? 
CompletionHandlerCall.DONE : CompletionHandlerCall.NONE;
+            }
+        };
+
+        /**
+         * Scatter read. The completion handler will be called once some
+         * data has been read or an error occurred. If a CompletionCheck
+         * object has been provided, the completion handler will only be
+         * called if the callHandler method returned true. If no
+         * CompletionCheck object has been provided, the completion handler
+         * will be called.
+         * @param dsts
+         * @param offset
+         * @param length
+         * @param timeout
+         * @param unit
+         * @param attachment
+         * @param check
+         * @param handler
+         * @return
+         */
+        // FIXME: @Override
+        public <A> CompletionState read(ByteBuffer[] dsts,
+                int offset, int length,
+                long timeout, TimeUnit unit, A attachment,
+                CompletionCheck check,
+                CompletionHandler<Long, ? super A> handler) {
+            OperationState<A> state = new OperationState<A>(dsts, offset, 
length, timeout, unit, attachment, check, handler);
+            if (readPending.tryAcquire()) {
+                Nio2Endpoint.startInline();
+                // FIXME: Add scatter read to Nio2Channel and 
ScatterReadCompletionHandler class
+                //getSocket().read(dsts, offset, length, timeout, unit, state, 
new ScatterReadCompletionHandler<>());
+                Nio2Endpoint.endInline();
+            } else {
+                throw new ReadPendingException();
+            }
+            return state.state;
+        }
+
+        public boolean isWritePending() {
+            synchronized (writeCompletionHandler) {
+                return writePending.availablePermits() == 0;
+            }
+        }
+
+        /**
+         * Gather write. The completion handler will be called once some
+         * data has been written or an error occurred. If a CompletionCheck
+         * object has been provided, the completion handler will only be
+         * called if the callHandler method returned true. If no
+         * CompletionCheck object has been provided, the completion handler
+         * will be called.
+         * @param srcs
+         * @param offset
+         * @param length
+         * @param timeout
+         * @param unit
+         * @param attachment
+         * @param check
+         * @param handler
+         * @return
+         */
+        // FIXME: @Override
+        public <A> CompletionState write(ByteBuffer[] srcs,
+                int offset, int length,
+                long timeout, TimeUnit unit, A attachment,
+                CompletionCheck check,
+                CompletionHandler<Long, ? super A> handler) {
+            OperationState<A> state = new OperationState<A>(srcs, offset, 
length, timeout, unit, attachment, check, handler);
+            if (writePending.tryAcquire()) {
+                Nio2Endpoint.startInline();
+                getSocket().write(srcs, offset, length, timeout, unit, state, 
new GatherWriteCompletionHandler<>());
+                Nio2Endpoint.endInline();
+            } else {
+                throw new WritePendingException();
+            }
+            return state.state;
+        }
+
+        // TODO: End NIO2 style scatter/gather methods.
 
         /* Callers of this method must:
          * - have acquired the readPending semaphore



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to