Author: markt Date: Mon Nov 10 16:46:03 2014 New Revision: 1637924 URL: http://svn.apache.org/r1637924 Log: Push read methods down into Nio2SocketWrapper
Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/Nio2ServletInputStream.java tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/Nio2ServletInputStream.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/Nio2ServletInputStream.java?rev=1637924&r1=1637923&r2=1637924&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/upgrade/Nio2ServletInputStream.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/Nio2ServletInputStream.java Mon Nov 10 16:46:03 2014 @@ -16,208 +16,32 @@ */ package org.apache.coyote.http11.upgrade; -import java.io.EOFException; import java.io.IOException; -import java.net.SocketTimeoutException; -import java.nio.ByteBuffer; -import java.nio.channels.AsynchronousCloseException; -import java.nio.channels.CompletionHandler; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import org.apache.tomcat.util.net.Nio2Channel; -import org.apache.tomcat.util.net.Nio2Endpoint; -import org.apache.tomcat.util.net.SocketStatus; +import org.apache.tomcat.util.net.Nio2Endpoint.Nio2SocketWrapper; import org.apache.tomcat.util.net.SocketWrapperBase; public class Nio2ServletInputStream extends AbstractServletInputStream { private final SocketWrapperBase<Nio2Channel> wrapper; - private final Nio2Channel channel; - private final CompletionHandler<Integer, SocketWrapperBase<Nio2Channel>> completionHandler; - private boolean flipped = false; - private volatile boolean readPending = false; - private volatile boolean interest = true; public Nio2ServletInputStream(SocketWrapperBase<Nio2Channel> wrapper) { this.wrapper = wrapper; - this.channel = wrapper.getSocket(); - this.completionHandler = new CompletionHandler<Integer, SocketWrapperBase<Nio2Channel>>() { - @Override - public void completed(Integer nBytes, SocketWrapperBase<Nio2Channel> attachment) { - boolean notify = false; - synchronized (completionHandler) { - if (nBytes.intValue() < 0) { - failed(new EOFException(), attachment); - } else { - readPending = false; - if (interest && !Nio2Endpoint.isInline()) { - interest = false; - notify = true; - } - } - } - if (notify) { - wrapper.getEndpoint().processSocket(attachment, SocketStatus.OPEN_READ, false); - } - } - @Override - public void failed(Throwable exc, SocketWrapperBase<Nio2Channel> attachment) { - attachment.setError(true); - readPending = false; - if (exc instanceof AsynchronousCloseException) { - // If already closed, don't call onError and close again - return; - } - onError(exc); - wrapper.getEndpoint().processSocket(attachment, SocketStatus.ERROR, true); - } - }; } @Override protected boolean doIsReady() throws IOException { - synchronized (completionHandler) { - if (readPending) { - interest = true; - return false; - } - ByteBuffer readBuffer = channel.getBufHandler().getReadBuffer(); - if (!flipped) { - readBuffer.flip(); - flipped = true; - } - if (readBuffer.remaining() > 0) { - return true; - } - - readBuffer.clear(); - flipped = false; - int nRead = fillReadBuffer(false); - - boolean isReady = nRead > 0; - if (isReady) { - if (!flipped) { - readBuffer.flip(); - flipped = true; - } - } else { - interest = true; - } - return isReady; - } + return ((Nio2SocketWrapper) wrapper).doIsReady(); } @Override - protected int doRead(boolean block, byte[] b, int off, int len) - throws IOException { - - synchronized (completionHandler) { - if (readPending) { - return 0; - } - - ByteBuffer readBuffer = channel.getBufHandler().getReadBuffer(); - - if (!flipped) { - readBuffer.flip(); - flipped = true; - } - int remaining = readBuffer.remaining(); - // Is there enough data in the read buffer to satisfy this request? - if (remaining >= len) { - readBuffer.get(b, off, len); - return len; - } - - // Copy what data there is in the read buffer to the byte array - int leftToWrite = len; - int newOffset = off; - if (remaining > 0) { - readBuffer.get(b, off, remaining); - leftToWrite -= remaining; - newOffset += remaining; - } - - // Fill the read buffer as best we can - readBuffer.clear(); - flipped = false; - int nRead = fillReadBuffer(block); - - // Full as much of the remaining byte array as possible with the data - // that was just read - if (nRead > 0) { - if (!flipped) { - readBuffer.flip(); - flipped = true; - } - if (nRead > leftToWrite) { - readBuffer.get(b, newOffset, leftToWrite); - leftToWrite = 0; - } else { - readBuffer.get(b, newOffset, nRead); - leftToWrite -= nRead; - } - } else if (nRead == 0) { - if (block) { - if (!flipped) { - readBuffer.flip(); - flipped = true; - } - } - } else if (nRead == -1) { - throw new EOFException(); - } - - return len - leftToWrite; - } + protected int doRead(boolean block, byte[] b, int off, int len) throws IOException { + return ((Nio2SocketWrapper) wrapper).doRead(block, b, off, len); } @Override protected void doClose() throws IOException { - channel.close(); - } - - private int fillReadBuffer(boolean block) throws IOException { - ByteBuffer readBuffer = channel.getBufHandler().getReadBuffer(); - int nRead = 0; - if (block) { - readPending = true; - readBuffer.clear(); - flipped = false; - try { - nRead = channel.read(readBuffer) - .get(wrapper.getTimeout(), TimeUnit.MILLISECONDS).intValue(); - readPending = false; - } catch (ExecutionException e) { - if (e.getCause() instanceof IOException) { - onError(e.getCause()); - throw (IOException) e.getCause(); - } else { - onError(e); - throw new IOException(e); - } - } catch (InterruptedException e) { - onError(e); - throw new IOException(e); - } catch (TimeoutException e) { - SocketTimeoutException ex = new SocketTimeoutException(); - onError(ex); - throw ex; - } - } else { - readPending = true; - readBuffer.clear(); - flipped = false; - Nio2Endpoint.startInline(); - channel.read(readBuffer, - wrapper.getTimeout(), TimeUnit.MILLISECONDS, wrapper, completionHandler); - Nio2Endpoint.endInline(); - if (!readPending) { - nRead = readBuffer.position(); - } - } - return nRead; + ((Nio2SocketWrapper) wrapper).doClose(); } } Modified: tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java?rev=1637924&r1=1637923&r2=1637924&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java Mon Nov 10 16:46:03 2014 @@ -22,18 +22,22 @@ import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.net.SocketTimeoutException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousChannelGroup; +import java.nio.channels.AsynchronousCloseException; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.ClosedChannelException; import java.nio.channels.CompletionHandler; import java.nio.channels.FileChannel; import java.nio.file.StandardOpenOption; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import javax.net.ssl.KeyManager; import javax.net.ssl.SSLContext; @@ -732,8 +736,43 @@ public class Nio2Endpoint extends Abstra private SendfileData sendfileData = null; private boolean upgradeInit = false; + private final CompletionHandler<Integer, SocketWrapperBase<Nio2Channel>> completionHandler; + private boolean flipped = false; + private volatile boolean readPending = false; + private volatile boolean interest = true; + public Nio2SocketWrapper(Nio2Channel channel, Nio2Endpoint endpoint) { super(channel, endpoint); + this.completionHandler = new CompletionHandler<Integer, SocketWrapperBase<Nio2Channel>>() { + @Override + public void completed(Integer nBytes, SocketWrapperBase<Nio2Channel> attachment) { + boolean notify = false; + synchronized (completionHandler) { + if (nBytes.intValue() < 0) { + failed(new EOFException(), attachment); + } else { + readPending = false; + if (interest && !Nio2Endpoint.isInline()) { + interest = false; + notify = true; + } + } + } + if (notify) { + getEndpoint().processSocket(attachment, SocketStatus.OPEN_READ, false); + } + } + @Override + public void failed(Throwable exc, SocketWrapperBase<Nio2Channel> attachment) { + attachment.setError(true); + readPending = false; + if (exc instanceof AsynchronousCloseException) { + // If already closed, don't call onError and close again + return; + } + getEndpoint().processSocket(attachment, SocketStatus.ERROR, true); + } + }; } @Override @@ -766,6 +805,144 @@ public class Nio2Endpoint extends Abstra public void setSendfileData(SendfileData sf) { this.sendfileData = sf; } public SendfileData getSendfileData() { return this.sendfileData; } + public boolean doIsReady() throws IOException { + synchronized (completionHandler) { + if (readPending) { + interest = true; + return false; + } + ByteBuffer readBuffer = getSocket().getBufHandler().getReadBuffer(); + if (!flipped) { + readBuffer.flip(); + flipped = true; + } + if (readBuffer.remaining() > 0) { + return true; + } + + readBuffer.clear(); + flipped = false; + int nRead = fillReadBuffer(false); + + boolean isReady = nRead > 0; + if (isReady) { + if (!flipped) { + readBuffer.flip(); + flipped = true; + } + } else { + interest = true; + } + return isReady; + } + } + + public int doRead(boolean block, byte[] b, int off, int len) + throws IOException { + + synchronized (completionHandler) { + if (readPending) { + return 0; + } + + ByteBuffer readBuffer = getSocket().getBufHandler().getReadBuffer(); + + if (!flipped) { + readBuffer.flip(); + flipped = true; + } + int remaining = readBuffer.remaining(); + // Is there enough data in the read buffer to satisfy this request? + if (remaining >= len) { + readBuffer.get(b, off, len); + return len; + } + + // Copy what data there is in the read buffer to the byte array + int leftToWrite = len; + int newOffset = off; + if (remaining > 0) { + readBuffer.get(b, off, remaining); + leftToWrite -= remaining; + newOffset += remaining; + } + + // Fill the read buffer as best we can + readBuffer.clear(); + flipped = false; + int nRead = fillReadBuffer(block); + + // Full as much of the remaining byte array as possible with the data + // that was just read + if (nRead > 0) { + if (!flipped) { + readBuffer.flip(); + flipped = true; + } + if (nRead > leftToWrite) { + readBuffer.get(b, newOffset, leftToWrite); + leftToWrite = 0; + } else { + readBuffer.get(b, newOffset, nRead); + leftToWrite -= nRead; + } + } else if (nRead == 0) { + if (block) { + if (!flipped) { + readBuffer.flip(); + flipped = true; + } + } + } else if (nRead == -1) { + throw new EOFException(); + } + + return len - leftToWrite; + } + } + + public void doClose() throws IOException { + getSocket().close(); + } + + private int fillReadBuffer(boolean block) throws IOException { + ByteBuffer readBuffer = getSocket().getBufHandler().getReadBuffer(); + int nRead = 0; + if (block) { + readPending = true; + readBuffer.clear(); + flipped = false; + try { + nRead = getSocket().read(readBuffer) + .get(getTimeout(), TimeUnit.MILLISECONDS).intValue(); + readPending = false; + } catch (ExecutionException e) { + if (e.getCause() instanceof IOException) { + throw (IOException) e.getCause(); + } else { + throw new IOException(e); + } + } catch (InterruptedException e) { + throw new IOException(e); + } catch (TimeoutException e) { + SocketTimeoutException ex = new SocketTimeoutException(); + throw ex; + } + } else { + readPending = true; + readBuffer.clear(); + flipped = false; + Nio2Endpoint.startInline(); + getSocket().read(readBuffer, getTimeout(), TimeUnit.MILLISECONDS, + this, completionHandler); + Nio2Endpoint.endInline(); + if (!readPending) { + nRead = readBuffer.position(); + } + } + return nRead; + } + } // ------------------------------------------------ Application Buffer Handler --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org