Author: markt Date: Mon Nov 10 16:47:25 2014 New Revision: 1637935 URL: http://svn.apache.org/r1637935 Log: Push write methods down to SocketWrapper for NIO2
Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/Nio2ServletOutputStream.java tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/Nio2ServletOutputStream.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/Nio2ServletOutputStream.java?rev=1637935&r1=1637934&r2=1637935&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/upgrade/Nio2ServletOutputStream.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/Nio2ServletOutputStream.java Mon Nov 10 16:47:25 2014 @@ -16,167 +16,27 @@ */ package org.apache.coyote.http11.upgrade; -import java.io.EOFException; import java.io.IOException; -import java.net.SocketTimeoutException; -import java.nio.ByteBuffer; -import java.nio.channels.AsynchronousCloseException; -import java.nio.channels.CompletionHandler; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import org.apache.tomcat.util.net.Nio2Channel; -import org.apache.tomcat.util.net.Nio2Endpoint; -import org.apache.tomcat.util.net.SocketStatus; +import org.apache.tomcat.util.net.Nio2Endpoint.Nio2SocketWrapper; import org.apache.tomcat.util.net.SocketWrapperBase; public class Nio2ServletOutputStream extends AbstractServletOutputStream<Nio2Channel> { - private final Nio2Channel channel; - private final int maxWrite; - private final CompletionHandler<Integer, ByteBuffer> completionHandler; - private final Semaphore writePending = new Semaphore(1); - public Nio2ServletOutputStream(SocketWrapperBase<Nio2Channel> socketWrapper0, int asyncWriteBufferSize) { super(socketWrapper0, asyncWriteBufferSize); - channel = socketWrapper0.getSocket(); - maxWrite = channel.getBufHandler().getWriteBuffer().capacity(); - this.completionHandler = new CompletionHandler<Integer, ByteBuffer>() { - @Override - public void completed(Integer nBytes, ByteBuffer attachment) { - if (nBytes.intValue() < 0) { - failed(new EOFException(), attachment); - } else if (attachment.hasRemaining()) { - channel.write(attachment, socketWrapper.getTimeout(), - TimeUnit.MILLISECONDS, attachment, completionHandler); - } else { - writePending.release(); - if (!Nio2Endpoint.isInline()) { - socketWrapper.getEndpoint().processSocket(socketWrapper, - SocketStatus.OPEN_WRITE, false); - } - } - } - @Override - public void failed(Throwable exc, ByteBuffer attachment) { - socketWrapper.setError(true); - writePending.release(); - if (exc instanceof AsynchronousCloseException) { - // If already closed, don't call onError and close again - return; - } - onError(exc); - socketWrapper.getEndpoint().processSocket(socketWrapper, SocketStatus.ERROR, true); - } - }; } @Override protected int doWrite(boolean block, byte[] b, int off, int len) throws IOException { - int leftToWrite = len; - int count = 0; - int offset = off; - - while (leftToWrite > 0) { - int writeThisLoop; - int writtenThisLoop; - - if (leftToWrite > maxWrite) { - writeThisLoop = maxWrite; - } else { - writeThisLoop = leftToWrite; - } - - writtenThisLoop = doWriteInternal(block, b, offset, writeThisLoop); - if (writtenThisLoop < 0) { - throw new EOFException(); - } - count += writtenThisLoop; - if (!block && writePending.availablePermits() == 0) { - // Prevent concurrent writes in non blocking mode, - // leftover data has to be buffered - return count; - } - offset += writtenThisLoop; - leftToWrite -= writtenThisLoop; - - if (writtenThisLoop < writeThisLoop) { - break; - } - } - - return count; - } - - private int doWriteInternal(boolean block, byte[] b, int off, int len) - throws IOException { - ByteBuffer buffer = channel.getBufHandler().getWriteBuffer(); - int written = 0; - if (block) { - buffer.clear(); - buffer.put(b, off, len); - buffer.flip(); - try { - written = channel.write(buffer).get(socketWrapper.getTimeout(), TimeUnit.MILLISECONDS).intValue(); - } catch (ExecutionException e) { - if (e.getCause() instanceof IOException) { - onError(e.getCause()); - throw (IOException) e.getCause(); - } else { - onError(e); - throw new IOException(e); - } - } catch (InterruptedException e) { - onError(e); - throw new IOException(e); - } catch (TimeoutException e) { - SocketTimeoutException ex = new SocketTimeoutException(); - onError(ex); - throw ex; - } - } else { - if (writePending.tryAcquire()) { - buffer.clear(); - buffer.put(b, off, len); - buffer.flip(); - Nio2Endpoint.startInline(); - channel.write(buffer, socketWrapper.getTimeout(), TimeUnit.MILLISECONDS, buffer, completionHandler); - Nio2Endpoint.endInline(); - written = len; - } - } - return written; + return ((Nio2SocketWrapper) socketWrapper).write(block, b, off, len); } @Override protected void doFlush() throws IOException { - try { - // Block until a possible non blocking write is done - if (writePending.tryAcquire(socketWrapper.getTimeout(), TimeUnit.MILLISECONDS)) { - writePending.release(); - channel.flush().get(socketWrapper.getTimeout(), TimeUnit.MILLISECONDS); - } else { - throw new TimeoutException(); - } - } catch (ExecutionException e) { - if (e.getCause() instanceof IOException) { - onError(e.getCause()); - throw (IOException) e.getCause(); - } else { - onError(e); - throw new IOException(e); - } - } catch (InterruptedException e) { - onError(e); - throw new IOException(e); - } catch (TimeoutException e) { - SocketTimeoutException ex = new SocketTimeoutException(); - onError(ex); - throw ex; - } + ((Nio2SocketWrapper) socketWrapper).flush(); } } Modified: tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java?rev=1637935&r1=1637934&r2=1637935&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java Mon Nov 10 16:47:25 2014 @@ -36,6 +36,7 @@ import java.util.concurrent.ExecutionExc import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -736,18 +737,25 @@ public class Nio2Endpoint extends Abstra private SendfileData sendfileData = null; private boolean upgradeInit = false; - private final CompletionHandler<Integer, SocketWrapperBase<Nio2Channel>> completionHandler; + private final CompletionHandler<Integer, SocketWrapperBase<Nio2Channel>> completionHandlerRead; private boolean flipped = false; private volatile boolean readPending = false; private volatile boolean interest = true; + private final int maxWrite; + private final CompletionHandler<Integer, ByteBuffer> completionHandlerWrite; + private final Semaphore writePending = new Semaphore(1); + + public Nio2SocketWrapper(Nio2Channel channel, Nio2Endpoint endpoint) { super(channel, endpoint); - this.completionHandler = new CompletionHandler<Integer, SocketWrapperBase<Nio2Channel>>() { + maxWrite = channel.getBufHandler().getWriteBuffer().capacity(); + + this.completionHandlerRead = new CompletionHandler<Integer, SocketWrapperBase<Nio2Channel>>() { @Override public void completed(Integer nBytes, SocketWrapperBase<Nio2Channel> attachment) { boolean notify = false; - synchronized (completionHandler) { + synchronized (completionHandlerRead) { if (nBytes.intValue() < 0) { failed(new EOFException(), attachment); } else { @@ -773,6 +781,34 @@ public class Nio2Endpoint extends Abstra getEndpoint().processSocket(attachment, SocketStatus.ERROR, true); } }; + + this.completionHandlerWrite = new CompletionHandler<Integer, ByteBuffer>() { + @Override + public void completed(Integer nBytes, ByteBuffer attachment) { + if (nBytes.intValue() < 0) { + failed(new EOFException(), attachment); + } else if (attachment.hasRemaining()) { + channel.write(attachment, getTimeout(), + TimeUnit.MILLISECONDS, attachment, completionHandlerWrite); + } else { + writePending.release(); + if (!Nio2Endpoint.isInline()) { + getEndpoint().processSocket(Nio2SocketWrapper.this, SocketStatus.OPEN_WRITE, false); + } + } + } + @Override + public void failed(Throwable exc, ByteBuffer attachment) { + setError(true); + writePending.release(); + if (exc instanceof AsynchronousCloseException) { + // If already closed, don't call onError and close again + return; + } + getEndpoint().processSocket(Nio2SocketWrapper.this, SocketStatus.ERROR, true); + } + }; + } @Override @@ -808,7 +844,7 @@ public class Nio2Endpoint extends Abstra @Override public boolean isReady() throws IOException { - synchronized (completionHandler) { + synchronized (completionHandlerRead) { if (readPending) { interest = true; return false; @@ -843,7 +879,7 @@ public class Nio2Endpoint extends Abstra @Override public int read(boolean block, byte[] b, int off, int len) throws IOException { - synchronized (completionHandler) { + synchronized (completionHandlerRead) { if (readPending) { return 0; } @@ -940,7 +976,7 @@ public class Nio2Endpoint extends Abstra flipped = false; Nio2Endpoint.startInline(); getSocket().read(readBuffer, getTimeout(), TimeUnit.MILLISECONDS, - this, completionHandler); + this, completionHandlerRead); Nio2Endpoint.endInline(); if (!readPending) { nRead = readBuffer.position(); @@ -949,8 +985,106 @@ public class Nio2Endpoint extends Abstra return nRead; } + + public int write(boolean block, byte[] b, int off, int len) throws IOException { + int leftToWrite = len; + int count = 0; + int offset = off; + + while (leftToWrite > 0) { + int writeThisLoop; + int writtenThisLoop; + + if (leftToWrite > maxWrite) { + writeThisLoop = maxWrite; + } else { + writeThisLoop = leftToWrite; + } + + writtenThisLoop = writeInternal(block, b, offset, writeThisLoop); + if (writtenThisLoop < 0) { + throw new EOFException(); + } + count += writtenThisLoop; + if (!block && writePending.availablePermits() == 0) { + // Prevent concurrent writes in non blocking mode, + // leftover data has to be buffered + return count; + } + offset += writtenThisLoop; + leftToWrite -= writtenThisLoop; + + if (writtenThisLoop < writeThisLoop) { + break; + } + } + + return count; + } + + + private int writeInternal(boolean block, byte[] b, int off, int len) + throws IOException { + ByteBuffer buffer = getSocket().getBufHandler().getWriteBuffer(); + int written = 0; + if (block) { + buffer.clear(); + buffer.put(b, off, len); + buffer.flip(); + try { + written = getSocket().write(buffer).get(getTimeout(), TimeUnit.MILLISECONDS).intValue(); + } catch (ExecutionException e) { + if (e.getCause() instanceof IOException) { + throw (IOException) e.getCause(); + } else { + throw new IOException(e); + } + } catch (InterruptedException e) { + throw new IOException(e); + } catch (TimeoutException e) { + SocketTimeoutException ex = new SocketTimeoutException(); + throw ex; + } + } else { + if (writePending.tryAcquire()) { + buffer.clear(); + buffer.put(b, off, len); + buffer.flip(); + Nio2Endpoint.startInline(); + getSocket().write(buffer, getTimeout(), TimeUnit.MILLISECONDS, buffer, completionHandlerWrite); + Nio2Endpoint.endInline(); + written = len; + } + } + return written; + } + + + public void flush() throws IOException { + try { + // Block until a possible non blocking write is done + if (writePending.tryAcquire(getTimeout(), TimeUnit.MILLISECONDS)) { + writePending.release(); + getSocket().flush().get(getTimeout(), TimeUnit.MILLISECONDS); + } else { + throw new TimeoutException(); + } + } catch (ExecutionException e) { + if (e.getCause() instanceof IOException) { + throw (IOException) e.getCause(); + } else { + throw new IOException(e); + } + } catch (InterruptedException e) { + throw new IOException(e); + } catch (TimeoutException e) { + SocketTimeoutException ex = new SocketTimeoutException(); + throw ex; + } + } } + // ------------------------------------------------ Application Buffer Handler public static class NioBufferHandler implements ApplicationBufferHandler { private ByteBuffer readbuf = null; --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org