Author: markt
Date: Thu Jan 15 09:21:08 2015
New Revision: 1652002

URL: http://svn.apache.org/r1652002
Log:
NIO reafctoring
 - Use read from socketWrapper rather than HttpNio2InputBuffer
 - Various API tweaks to support the above

Modified:
    tomcat/trunk/java/org/apache/coyote/http11/Http11Nio2Processor.java
    tomcat/trunk/java/org/apache/coyote/http11/InternalAprInputBuffer.java
    tomcat/trunk/java/org/apache/coyote/http11/InternalNio2InputBuffer.java
    tomcat/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java
    tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
    tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java
    tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
    tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java

Modified: tomcat/trunk/java/org/apache/coyote/http11/Http11Nio2Processor.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/Http11Nio2Processor.java?rev=1652002&r1=1652001&r2=1652002&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/Http11Nio2Processor.java 
(original)
+++ tomcat/trunk/java/org/apache/coyote/http11/Http11Nio2Processor.java Thu Jan 
15 09:21:08 2015
@@ -85,7 +85,7 @@ public class Http11Nio2Processor extends
     @Override
     public SocketState asyncDispatch(SocketStatus status) {
         SocketState state = super.asyncDispatch(status);
-        if (state == SocketState.OPEN && ((InternalNio2InputBuffer) 
getInputBuffer()).isPending()) {
+        if (state == SocketState.OPEN && socketWrapper.isReadPending()) {
             // Following async processing, a read is still pending, so
             // keep the processor associated
             return SocketState.LONG;
@@ -97,7 +97,7 @@ public class Http11Nio2Processor extends
     @Override
     protected void registerForEvent(boolean read, boolean write) {
         if (read) {
-            ((InternalNio2InputBuffer) 
getInputBuffer()).registerReadInterest();
+            socketWrapper.registerReadInterest();
         }
         if (write) {
             socketWrapper.registerWriteInterest();

Modified: tomcat/trunk/java/org/apache/coyote/http11/InternalAprInputBuffer.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalAprInputBuffer.java?rev=1652002&r1=1652001&r2=1652002&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/InternalAprInputBuffer.java 
(original)
+++ tomcat/trunk/java/org/apache/coyote/http11/InternalAprInputBuffer.java Thu 
Jan 15 09:21:08 2015
@@ -37,12 +37,9 @@ public class InternalAprInputBuffer exte
     private static final Log log =
         LogFactory.getLog(InternalAprInputBuffer.class);
 
-    // ----------------------------------------------------------- Constructors
 
+    // ----------------------------------------------------------- Constructors
 
-    /**
-     * Alternate constructor.
-     */
     public InternalAprInputBuffer(Request request, int headerBufferSize) {
         super(request, headerBufferSize);
         inputStreamInputBuffer = new SocketInputBuffer();
@@ -81,7 +78,7 @@ public class InternalAprInputBuffer exte
 
         wrapper = socketWrapper;
 
-        int bufLength = Math.max(headerBufferSize, 8192);
+        int bufLength = Math.max(headerBufferSize * 2, 8192);
         if (buf == null || buf.length < bufLength) {
             buf = new byte[bufLength];
         }
@@ -116,9 +113,7 @@ public class InternalAprInputBuffer exte
      * This class is an input buffer which will read its data from an input
      * stream.
      */
-    protected class SocketInputBuffer
-        implements InputBuffer {
-
+    protected class SocketInputBuffer implements InputBuffer {
 
         /**
          * Read bytes into the specified chunk.
@@ -136,7 +131,7 @@ public class InternalAprInputBuffer exte
             chunk.setBytes(buf, pos, length);
             pos = lastValid;
 
-            return (length);
+            return length;
         }
     }
 }

Modified: 
tomcat/trunk/java/org/apache/coyote/http11/InternalNio2InputBuffer.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalNio2InputBuffer.java?rev=1652002&r1=1652001&r2=1652002&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/InternalNio2InputBuffer.java 
(original)
+++ tomcat/trunk/java/org/apache/coyote/http11/InternalNio2InputBuffer.java Thu 
Jan 15 09:21:08 2015
@@ -18,14 +18,6 @@ package org.apache.coyote.http11;
 
 import java.io.EOFException;
 import java.io.IOException;
-import java.net.SocketTimeoutException;
-import java.nio.ByteBuffer;
-import java.nio.channels.CompletionHandler;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import javax.servlet.RequestDispatcher;
 
 import org.apache.coyote.InputBuffer;
 import org.apache.coyote.Request;
@@ -34,8 +26,6 @@ import org.apache.juli.logging.LogFactor
 import org.apache.tomcat.util.buf.ByteChunk;
 import org.apache.tomcat.util.net.AbstractEndpoint;
 import org.apache.tomcat.util.net.Nio2Channel;
-import org.apache.tomcat.util.net.Nio2Endpoint;
-import org.apache.tomcat.util.net.SocketStatus;
 import org.apache.tomcat.util.net.SocketWrapperBase;
 
 /**
@@ -46,87 +36,39 @@ public class InternalNio2InputBuffer ext
     private static final Log log =
             LogFactory.getLog(InternalNio2InputBuffer.class);
 
-    // ----------------------------------------------------------- Constructors
 
+    // ----------------------------------------------------------- Constructors
 
     public InternalNio2InputBuffer(Request request, int headerBufferSize) {
         super(request, headerBufferSize);
         inputStreamInputBuffer = new SocketInputBuffer();
     }
 
-    /**
-     * Underlying socket.
-     */
-    private SocketWrapperBase<Nio2Channel> socket;
-
-    /**
-     * Track write interest
-     */
-    protected volatile boolean interest = false;
-
-    /**
-     * The completion handler used for asynchronous read operations
-     */
-    private CompletionHandler<Integer, SocketWrapperBase<Nio2Channel>> 
completionHandler;
-
-    /**
-     * The associated endpoint.
-     */
-    protected AbstractEndpoint<Nio2Channel> endpoint = null;
-
-    /**
-     * Read pending flag.
-     */
-    protected volatile boolean readPending = false;
+    // ----------------------------------------------------- Instance Variables
 
-    /**
-     * Exception that occurred during writing.
-     */
-    protected IOException e = null;
+    private SocketWrapperBase<Nio2Channel> wrapper;
 
-    /**
-     * Track if the byte buffer is flipped
-     */
-    protected volatile boolean flipped = false;
 
     // --------------------------------------------------------- Public Methods
 
-    @Override
-    protected final Log getLog() {
-        return log;
-    }
-
-
     /**
      * Recycle the input buffer. This should be called when closing the
      * connection.
      */
     @Override
     public void recycle() {
+        wrapper = null;
         super.recycle();
-        socket = null;
-        readPending = false;
-        flipped = false;
-        interest = false;
-        e = null;
     }
 
 
-    /**
-     * End processing of current HTTP request.
-     * Note: All bytes of the current request should have been already
-     * consumed. This method only resets all the pointers so that we are ready
-     * to parse the next HTTP request.
-     */
+    // ------------------------------------------------------ Protected Methods
+
     @Override
-    public void nextRequest() {
-        super.nextRequest();
-        interest = false;
+    protected final Log getLog() {
+        return log;
     }
 
-    public boolean isPending() {
-        return readPending;
-    }
 
     // ------------------------------------------------------ Protected Methods
 
@@ -134,62 +76,17 @@ public class InternalNio2InputBuffer ext
     protected void init(SocketWrapperBase<Nio2Channel> socketWrapper,
             AbstractEndpoint<Nio2Channel> associatedEndpoint) throws 
IOException {
 
-        endpoint = associatedEndpoint;
-        socket = socketWrapper;
-        if (socket == null) {
-            // Socket has been closed in another thread
-            throw new IOException(sm.getString("iib.socketClosed"));
-        }
-        socketReadBufferSize =
-            socket.getSocket().getBufHandler().getReadBuffer().capacity();
+        wrapper = socketWrapper;
 
-        int bufLength = headerBufferSize + socketReadBufferSize;
+        int bufLength = headerBufferSize + 
wrapper.getSocket().getBufHandler().getReadBuffer().capacity();
         if (buf == null || buf.length < bufLength) {
             buf = new byte[bufLength];
         }
-
-        // Initialize the completion handler
-        this.completionHandler = new CompletionHandler<Integer, 
SocketWrapperBase<Nio2Channel>>() {
-
-            @Override
-            public void completed(Integer nBytes, 
SocketWrapperBase<Nio2Channel> attachment) {
-                boolean notify = false;
-                synchronized (completionHandler) {
-                    if (nBytes.intValue() < 0) {
-                        failed(new 
EOFException(sm.getString("iib.eof.error")), attachment);
-                    } else {
-                        readPending = false;
-                        if ((request.getReadListener() == null || interest) && 
!Nio2Endpoint.isInline()) {
-                            interest = false;
-                            notify = true;
-                        }
-                    }
-                }
-                if (notify) {
-                    endpoint.processSocket(attachment, SocketStatus.OPEN_READ, 
false);
-                }
-            }
-
-            @Override
-            public void failed(Throwable exc, SocketWrapperBase<Nio2Channel> 
attachment) {
-                if (exc instanceof IOException) {
-                    e = (IOException) exc;
-                } else {
-                    e = new IOException(exc);
-                }
-                attachment.setError(e);
-                request.setAttribute(RequestDispatcher.ERROR_EXCEPTION, e);
-                readPending = false;
-                endpoint.processSocket(attachment, SocketStatus.OPEN_READ, 
true);
-            }
-        };
     }
 
     @Override
     protected boolean fill(boolean block) throws IOException, EOFException {
-        if (e != null) {
-            throw e;
-        }
+
         if (parsingHeader) {
             if (lastValid > headerBufferSize) {
                 throw new IllegalArgumentException
@@ -198,127 +95,24 @@ public class InternalNio2InputBuffer ext
         } else {
             lastValid = pos = end;
         }
-        // Now fill the internal buffer
-        int nRead = 0;
-        ByteBuffer byteBuffer = 
socket.getSocket().getBufHandler().getReadBuffer();
-        if (block) {
-            if (!flipped) {
-                byteBuffer.flip();
-                flipped = true;
-            }
-            int nBytes = byteBuffer.remaining();
-            // This case can happen when a blocking read follows a non blocking
-            // fill that completed asynchronously
-            if (nBytes > 0) {
-                expand(nBytes + pos);
-                byteBuffer.get(buf, pos, nBytes);
-                lastValid = pos + nBytes;
-                byteBuffer.clear();
-                flipped = false;
-                return true;
-            } else {
-                byteBuffer.clear();
-                flipped = false;
-                try {
-                    nRead = socket.getSocket().read(byteBuffer)
-                            .get(socket.getTimeout(), 
TimeUnit.MILLISECONDS).intValue();
-                } catch (ExecutionException e) {
-                    if (e.getCause() instanceof IOException) {
-                        throw (IOException) e.getCause();
-                    } else {
-                        throw new IOException(e);
-                    }
-                } catch (InterruptedException e) {
-                    throw new IOException(e);
-                } catch (TimeoutException e) {
-                    throw new SocketTimeoutException();
-                }
-                if (nRead > 0) {
-                    if (!flipped) {
-                        byteBuffer.flip();
-                        flipped = true;
-                    }
-                    expand(nRead + pos);
-                    byteBuffer.get(buf, pos, nRead);
-                    lastValid = pos + nRead;
-                    return true;
-                } else if (nRead == -1) {
-                    //return false;
-                    throw new EOFException(sm.getString("iib.eof.error"));
-                } else {
-                    return false;
-                }
-            }
-        } else {
-            synchronized (completionHandler) {
-                if (!readPending) {
-                    if (!flipped) {
-                        byteBuffer.flip();
-                        flipped = true;
-                    }
-                    int nBytes = byteBuffer.remaining();
-                    if (nBytes > 0) {
-                        expand(nBytes + pos);
-                        byteBuffer.get(buf, pos, nBytes);
-                        lastValid = pos + nBytes;
-                        byteBuffer.clear();
-                        flipped = false;
-                    } else {
-                        byteBuffer.clear();
-                        flipped = false;
-                        readPending = true;
-                        Nio2Endpoint.startInline();
-                        socket.getSocket().read(byteBuffer, 
socket.getTimeout(),
-                                    TimeUnit.MILLISECONDS, socket, 
completionHandler);
-                        Nio2Endpoint.endInline();
-                        // Return the number of bytes that have been placed 
into the buffer
-                        if (!readPending) {
-                            // If the completion handler completed immediately
-                            if (!flipped) {
-                                byteBuffer.flip();
-                                flipped = true;
-                            }
-                            nBytes = byteBuffer.remaining();
-                            if (nBytes > 0) {
-                                expand(nBytes + pos);
-                                byteBuffer.get(buf, pos, nBytes);
-                                lastValid = pos + nBytes;
-                            }
-                            byteBuffer.clear();
-                            flipped = false;
-                        }
-                    }
-                    return (lastValid - pos) > 0;
-                } else {
-                    return false;
-                }
-            }
-        }
-    }
-
 
-    public void registerReadInterest() {
-        synchronized (completionHandler) {
-            if (readPending) {
-                interest = true;
-            } else {
-                // If no read is pending, notify
-                endpoint.processSocket(socket, SocketStatus.OPEN_READ, true);
-            }
+        int nRead = wrapper.read(block, buf, pos, buf.length - pos);
+        if (nRead > 0) {
+            lastValid = pos + nRead;
+            return true;
         }
+
+        return false;
     }
 
 
     // ------------------------------------- InputStreamInputBuffer Inner Class
 
-
     /**
      * This class is an input buffer which will read its data from an input
      * stream.
      */
-    protected class SocketInputBuffer
-        implements InputBuffer {
-
+    protected class SocketInputBuffer implements InputBuffer {
 
         /**
          * Read bytes into the specified chunk.
@@ -331,19 +125,12 @@ public class InternalNio2InputBuffer ext
                 if (!fill(true)) //read body, must be blocking, as the thread 
is inside the app
                     return -1;
             }
-            if (isBlocking()) {
-                int length = lastValid - pos;
-                chunk.setBytes(buf, pos, length);
-                pos = lastValid;
-                return (length);
-            } else {
-                synchronized (completionHandler) {
-                    int length = lastValid - pos;
-                    chunk.setBytes(buf, pos, length);
-                    pos = lastValid;
-                    return (length);
-                }
-            }
+
+            int length = lastValid - pos;
+            chunk.setBytes(buf, pos, length);
+            pos = lastValid;
+
+            return length;
         }
     }
 }

Modified: tomcat/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java?rev=1652002&r1=1652001&r2=1652002&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java 
(original)
+++ tomcat/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java Thu 
Jan 15 09:21:08 2015
@@ -80,7 +80,7 @@ public class InternalNioInputBuffer exte
 
         wrapper = socketWrapper;
 
-        int bufLength = Math.max(headerBufferSize, 8192);
+        int bufLength = headerBufferSize + 
wrapper.getSocket().getBufHandler().getReadBuffer().capacity();
         if (buf == null || buf.length < bufLength) {
             buf = new byte[bufLength];
         }

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java?rev=1652002&r1=1652001&r2=1652002&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java Thu Jan 15 
09:21:08 2015
@@ -2629,8 +2629,20 @@ public class AprEndpoint extends Abstrac
 
 
         @Override
+        public boolean isReadPending() {
+            return false;
+        }
+
+
+        @Override
+        public void registerReadInterest() {
+            regsiterForEvent(true, false);
+        }
+
+
+        @Override
         public void registerWriteInterest() {
-            ((AprEndpoint) 
getEndpoint()).getPoller().add(getSocket().longValue(), -1, false, true);
+            regsiterForEvent(false, true);
         }
 
 

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java?rev=1652002&r1=1652001&r2=1652002&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java Thu Jan 15 
09:21:08 2015
@@ -977,6 +977,9 @@ public class Nio2Endpoint extends Abstra
 
         @Override
         public int read(boolean block, byte[] b, int off, int len) throws 
IOException {
+            if (getError() != null) {
+                throw getError();
+            }
 
             if (log.isDebugEnabled()) {
                 log.debug("Socket: [" + this + "], block: [" + block + "], 
length: [" + len + "]");
@@ -1242,6 +1245,27 @@ public class Nio2Endpoint extends Abstra
             }
         }
 
+
+        @Override
+        public boolean isReadPending() {
+            synchronized (readCompletionHandler) {
+                return readPending;
+            }
+        }
+
+
+        @Override
+        public void registerReadInterest() {
+            synchronized (readCompletionHandler) {
+                if (readPending) {
+                    readInterest = true;
+                } else {
+                    // If no read is pending, notify
+                    getEndpoint().processSocket(this, SocketStatus.OPEN_READ, 
true);
+                }
+            }
+        }
+
 
         @Override
         public void registerWriteInterest() {

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=1652002&r1=1652001&r2=1652002&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Thu Jan 15 
09:21:08 2015
@@ -1548,6 +1548,18 @@ public class NioEndpoint extends Abstrac
 
 
         @Override
+        public boolean isReadPending() {
+            return false;
+        }
+
+
+        @Override
+        public void registerReadInterest() {
+            getPoller().add(getSocket(), SelectionKey.OP_READ);
+        }
+
+
+        @Override
         public void registerWriteInterest() {
             getPoller().add(getSocket(), SelectionKey.OP_WRITE);
         }

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java?rev=1652002&r1=1652001&r2=1652002&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java 
(original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java Thu Jan 
15 09:21:08 2015
@@ -178,6 +178,8 @@ public abstract class SocketWrapperBase<
     }
     public Object getWriteThreadLock() { return writeThreadLock; }
 
+    public abstract boolean isReadPending();
+
     protected boolean hasMoreDataToFlush() {
         return (writeBufferFlipped && socketWriteBuffer.remaining() > 0) ||
         (!writeBufferFlipped && socketWriteBuffer.position() > 0);
@@ -498,6 +500,8 @@ public abstract class SocketWrapperBase<
         holder.getBuf().put(buf,offset,length);
     }
 
+    public abstract void registerReadInterest();
+
     public abstract void registerWriteInterest();
 
     public abstract void regsiterForEvent(boolean read, boolean write);



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to