Author: markt
Date: Mon Nov 10 16:47:25 2014
New Revision: 1637935

URL: http://svn.apache.org/r1637935
Log:
Push write methods down to SocketWrapper for NIO2

Modified:
    
tomcat/trunk/java/org/apache/coyote/http11/upgrade/Nio2ServletOutputStream.java
    tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java

Modified: 
tomcat/trunk/java/org/apache/coyote/http11/upgrade/Nio2ServletOutputStream.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/Nio2ServletOutputStream.java?rev=1637935&r1=1637934&r2=1637935&view=diff
==============================================================================
--- 
tomcat/trunk/java/org/apache/coyote/http11/upgrade/Nio2ServletOutputStream.java 
(original)
+++ 
tomcat/trunk/java/org/apache/coyote/http11/upgrade/Nio2ServletOutputStream.java 
Mon Nov 10 16:47:25 2014
@@ -16,167 +16,27 @@
  */
 package org.apache.coyote.http11.upgrade;
 
-import java.io.EOFException;
 import java.io.IOException;
-import java.net.SocketTimeoutException;
-import java.nio.ByteBuffer;
-import java.nio.channels.AsynchronousCloseException;
-import java.nio.channels.CompletionHandler;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 import org.apache.tomcat.util.net.Nio2Channel;
-import org.apache.tomcat.util.net.Nio2Endpoint;
-import org.apache.tomcat.util.net.SocketStatus;
+import org.apache.tomcat.util.net.Nio2Endpoint.Nio2SocketWrapper;
 import org.apache.tomcat.util.net.SocketWrapperBase;
 
 public class Nio2ServletOutputStream extends 
AbstractServletOutputStream<Nio2Channel> {
 
-    private final Nio2Channel channel;
-    private final int maxWrite;
-    private final CompletionHandler<Integer, ByteBuffer> completionHandler;
-    private final Semaphore writePending = new Semaphore(1);
-
     public Nio2ServletOutputStream(SocketWrapperBase<Nio2Channel> 
socketWrapper0,
             int asyncWriteBufferSize) {
         super(socketWrapper0, asyncWriteBufferSize);
-        channel = socketWrapper0.getSocket();
-        maxWrite = channel.getBufHandler().getWriteBuffer().capacity();
-        this.completionHandler = new CompletionHandler<Integer, ByteBuffer>() {
-            @Override
-            public void completed(Integer nBytes, ByteBuffer attachment) {
-                if (nBytes.intValue() < 0) {
-                    failed(new EOFException(), attachment);
-                } else if (attachment.hasRemaining()) {
-                    channel.write(attachment, socketWrapper.getTimeout(),
-                            TimeUnit.MILLISECONDS, attachment, 
completionHandler);
-                } else {
-                    writePending.release();
-                    if (!Nio2Endpoint.isInline()) {
-                        
socketWrapper.getEndpoint().processSocket(socketWrapper,
-                                SocketStatus.OPEN_WRITE, false);
-                    }
-                }
-            }
-            @Override
-            public void failed(Throwable exc, ByteBuffer attachment) {
-                socketWrapper.setError(true);
-                writePending.release();
-                if (exc instanceof AsynchronousCloseException) {
-                    // If already closed, don't call onError and close again
-                    return;
-                }
-                onError(exc);
-                socketWrapper.getEndpoint().processSocket(socketWrapper, 
SocketStatus.ERROR, true);
-            }
-        };
     }
 
     @Override
     protected int doWrite(boolean block, byte[] b, int off, int len)
             throws IOException {
-        int leftToWrite = len;
-        int count = 0;
-        int offset = off;
-
-        while (leftToWrite > 0) {
-            int writeThisLoop;
-            int writtenThisLoop;
-
-            if (leftToWrite > maxWrite) {
-                writeThisLoop = maxWrite;
-            } else {
-                writeThisLoop = leftToWrite;
-            }
-
-            writtenThisLoop = doWriteInternal(block, b, offset, writeThisLoop);
-            if (writtenThisLoop < 0) {
-                throw new EOFException();
-            }
-            count += writtenThisLoop;
-            if (!block && writePending.availablePermits() == 0) {
-                // Prevent concurrent writes in non blocking mode,
-                // leftover data has to be buffered
-                return count;
-            }
-            offset += writtenThisLoop;
-            leftToWrite -= writtenThisLoop;
-
-            if (writtenThisLoop < writeThisLoop) {
-                break;
-            }
-        }
-
-        return count;
-    }
-
-    private int doWriteInternal(boolean block, byte[] b, int off, int len)
-            throws IOException {
-        ByteBuffer buffer = channel.getBufHandler().getWriteBuffer();
-        int written = 0;
-        if (block) {
-            buffer.clear();
-            buffer.put(b, off, len);
-            buffer.flip();
-            try {
-                written = 
channel.write(buffer).get(socketWrapper.getTimeout(), 
TimeUnit.MILLISECONDS).intValue();
-            } catch (ExecutionException e) {
-                if (e.getCause() instanceof IOException) {
-                    onError(e.getCause());
-                    throw (IOException) e.getCause();
-                } else {
-                    onError(e);
-                    throw new IOException(e);
-                }
-            } catch (InterruptedException e) {
-                onError(e);
-                throw new IOException(e);
-            } catch (TimeoutException e) {
-                SocketTimeoutException ex = new SocketTimeoutException();
-                onError(ex);
-                throw ex;
-            }
-        } else {
-            if (writePending.tryAcquire()) {
-                buffer.clear();
-                buffer.put(b, off, len);
-                buffer.flip();
-                Nio2Endpoint.startInline();
-                channel.write(buffer, socketWrapper.getTimeout(), 
TimeUnit.MILLISECONDS, buffer, completionHandler);
-                Nio2Endpoint.endInline();
-                written = len;
-            }
-        }
-        return written;
+        return ((Nio2SocketWrapper) socketWrapper).write(block, b, off, len);
     }
 
     @Override
     protected void doFlush() throws IOException {
-        try {
-            // Block until a possible non blocking write is done
-            if (writePending.tryAcquire(socketWrapper.getTimeout(), 
TimeUnit.MILLISECONDS)) {
-                writePending.release();
-                channel.flush().get(socketWrapper.getTimeout(), 
TimeUnit.MILLISECONDS);
-            } else {
-                throw new TimeoutException();
-            }
-        } catch (ExecutionException e) {
-            if (e.getCause() instanceof IOException) {
-                onError(e.getCause());
-                throw (IOException) e.getCause();
-            } else {
-                onError(e);
-                throw new IOException(e);
-            }
-        } catch (InterruptedException e) {
-            onError(e);
-            throw new IOException(e);
-        } catch (TimeoutException e) {
-            SocketTimeoutException ex = new SocketTimeoutException();
-            onError(ex);
-            throw ex;
-        }
+        ((Nio2SocketWrapper) socketWrapper).flush();
     }
 }

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java?rev=1637935&r1=1637934&r2=1637935&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java Mon Nov 10 
16:47:25 2014
@@ -36,6 +36,7 @@ import java.util.concurrent.ExecutionExc
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -736,18 +737,25 @@ public class Nio2Endpoint extends Abstra
         private SendfileData sendfileData = null;
         private boolean upgradeInit = false;
 
-        private final CompletionHandler<Integer, 
SocketWrapperBase<Nio2Channel>> completionHandler;
+        private final CompletionHandler<Integer, 
SocketWrapperBase<Nio2Channel>> completionHandlerRead;
         private boolean flipped = false;
         private volatile boolean readPending = false;
         private volatile boolean interest = true;
 
+        private final int maxWrite;
+        private final CompletionHandler<Integer, ByteBuffer> 
completionHandlerWrite;
+        private final Semaphore writePending = new Semaphore(1);
+
+
         public Nio2SocketWrapper(Nio2Channel channel, Nio2Endpoint endpoint) {
             super(channel, endpoint);
-            this.completionHandler = new CompletionHandler<Integer, 
SocketWrapperBase<Nio2Channel>>() {
+            maxWrite = channel.getBufHandler().getWriteBuffer().capacity();
+
+            this.completionHandlerRead = new CompletionHandler<Integer, 
SocketWrapperBase<Nio2Channel>>() {
                 @Override
                 public void completed(Integer nBytes, 
SocketWrapperBase<Nio2Channel> attachment) {
                     boolean notify = false;
-                    synchronized (completionHandler) {
+                    synchronized (completionHandlerRead) {
                         if (nBytes.intValue() < 0) {
                             failed(new EOFException(), attachment);
                         } else {
@@ -773,6 +781,34 @@ public class Nio2Endpoint extends Abstra
                     getEndpoint().processSocket(attachment, 
SocketStatus.ERROR, true);
                 }
             };
+
+            this.completionHandlerWrite = new CompletionHandler<Integer, 
ByteBuffer>() {
+                @Override
+                public void completed(Integer nBytes, ByteBuffer attachment) {
+                    if (nBytes.intValue() < 0) {
+                        failed(new EOFException(), attachment);
+                    } else if (attachment.hasRemaining()) {
+                        channel.write(attachment, getTimeout(),
+                                TimeUnit.MILLISECONDS, attachment, 
completionHandlerWrite);
+                    } else {
+                        writePending.release();
+                        if (!Nio2Endpoint.isInline()) {
+                            
getEndpoint().processSocket(Nio2SocketWrapper.this, SocketStatus.OPEN_WRITE, 
false);
+                        }
+                    }
+                }
+                @Override
+                public void failed(Throwable exc, ByteBuffer attachment) {
+                    setError(true);
+                    writePending.release();
+                    if (exc instanceof AsynchronousCloseException) {
+                        // If already closed, don't call onError and close 
again
+                        return;
+                    }
+                    getEndpoint().processSocket(Nio2SocketWrapper.this, 
SocketStatus.ERROR, true);
+                }
+            };
+
         }
 
         @Override
@@ -808,7 +844,7 @@ public class Nio2Endpoint extends Abstra
 
         @Override
         public boolean isReady() throws IOException {
-            synchronized (completionHandler) {
+            synchronized (completionHandlerRead) {
                 if (readPending) {
                     interest = true;
                     return false;
@@ -843,7 +879,7 @@ public class Nio2Endpoint extends Abstra
         @Override
         public int read(boolean block, byte[] b, int off, int len) throws 
IOException {
 
-            synchronized (completionHandler) {
+            synchronized (completionHandlerRead) {
                 if (readPending) {
                     return 0;
                 }
@@ -940,7 +976,7 @@ public class Nio2Endpoint extends Abstra
                 flipped = false;
                 Nio2Endpoint.startInline();
                 getSocket().read(readBuffer, getTimeout(), 
TimeUnit.MILLISECONDS,
-                        this, completionHandler);
+                        this, completionHandlerRead);
                 Nio2Endpoint.endInline();
                 if (!readPending) {
                     nRead = readBuffer.position();
@@ -949,8 +985,106 @@ public class Nio2Endpoint extends Abstra
             return nRead;
         }
 
+
+        public int write(boolean block, byte[] b, int off, int len) throws 
IOException {
+            int leftToWrite = len;
+            int count = 0;
+            int offset = off;
+
+            while (leftToWrite > 0) {
+                int writeThisLoop;
+                int writtenThisLoop;
+
+                if (leftToWrite > maxWrite) {
+                    writeThisLoop = maxWrite;
+                } else {
+                    writeThisLoop = leftToWrite;
+                }
+
+                writtenThisLoop = writeInternal(block, b, offset, 
writeThisLoop);
+                if (writtenThisLoop < 0) {
+                    throw new EOFException();
+                }
+                count += writtenThisLoop;
+                if (!block && writePending.availablePermits() == 0) {
+                    // Prevent concurrent writes in non blocking mode,
+                    // leftover data has to be buffered
+                    return count;
+                }
+                offset += writtenThisLoop;
+                leftToWrite -= writtenThisLoop;
+
+                if (writtenThisLoop < writeThisLoop) {
+                    break;
+                }
+            }
+
+            return count;
+        }
+
+
+        private int writeInternal(boolean block, byte[] b, int off, int len)
+                throws IOException {
+            ByteBuffer buffer = getSocket().getBufHandler().getWriteBuffer();
+            int written = 0;
+            if (block) {
+                buffer.clear();
+                buffer.put(b, off, len);
+                buffer.flip();
+                try {
+                    written = getSocket().write(buffer).get(getTimeout(), 
TimeUnit.MILLISECONDS).intValue();
+                } catch (ExecutionException e) {
+                    if (e.getCause() instanceof IOException) {
+                        throw (IOException) e.getCause();
+                    } else {
+                        throw new IOException(e);
+                    }
+                } catch (InterruptedException e) {
+                    throw new IOException(e);
+                } catch (TimeoutException e) {
+                    SocketTimeoutException ex = new SocketTimeoutException();
+                    throw ex;
+                }
+            } else {
+                if (writePending.tryAcquire()) {
+                    buffer.clear();
+                    buffer.put(b, off, len);
+                    buffer.flip();
+                    Nio2Endpoint.startInline();
+                    getSocket().write(buffer, getTimeout(), 
TimeUnit.MILLISECONDS, buffer, completionHandlerWrite);
+                    Nio2Endpoint.endInline();
+                    written = len;
+                }
+            }
+            return written;
+        }
+
+
+        public void flush() throws IOException {
+            try {
+                // Block until a possible non blocking write is done
+                if (writePending.tryAcquire(getTimeout(), 
TimeUnit.MILLISECONDS)) {
+                    writePending.release();
+                    getSocket().flush().get(getTimeout(), 
TimeUnit.MILLISECONDS);
+                } else {
+                    throw new TimeoutException();
+                }
+            } catch (ExecutionException e) {
+                if (e.getCause() instanceof IOException) {
+                    throw (IOException) e.getCause();
+                } else {
+                    throw new IOException(e);
+                }
+            } catch (InterruptedException e) {
+                throw new IOException(e);
+            } catch (TimeoutException e) {
+                SocketTimeoutException ex = new SocketTimeoutException();
+                throw ex;
+            }
+        }
     }
 
+
     // ------------------------------------------------ Application Buffer 
Handler
     public static class NioBufferHandler implements ApplicationBufferHandler {
         private ByteBuffer readbuf = null;



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to