Author: markt Date: Thu Sep 5 22:22:40 2013 New Revision: 1520443 URL: http://svn.apache.org/r1520443 Log: Implement Servlet 3.1 non-blocking writes for AJP. Writes are fully non-blocking, both between and within AJP messages.
Modified: tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java Modified: tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java?rev=1520443&r1=1520442&r2=1520443&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java (original) +++ tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java Thu Sep 5 22:22:40 2013 @@ -20,9 +20,12 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InterruptedIOException; import java.net.InetAddress; +import java.nio.ByteBuffer; import java.security.NoSuchProviderException; import java.security.cert.CertificateFactory; import java.security.cert.X509Certificate; +import java.util.Iterator; +import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.atomic.AtomicBoolean; import javax.servlet.http.HttpUpgradeHandler; @@ -164,6 +167,7 @@ public abstract class AbstractAjpProcess */ private int responseMsgPos = -1; + /** * Body message. */ @@ -177,6 +181,22 @@ public abstract class AbstractAjpProcess /** + * The max size of the buffered write buffer + */ + private int bufferedWriteSize = 64*1024; //64k default write buffer + + + /** + * For "non-blocking" writes use an external set of buffers. Although the + * API only allows one non-blocking write at a time, due to buffering and + * the possible need to write HTTP headers, there may be more than one write + * to the OutputBuffer. + */ + private final LinkedBlockingDeque<ByteBufferHolder> bufferedWrites = + new LinkedBlockingDeque<>(); + + + /** * Error flag. */ protected boolean error = false; @@ -1470,6 +1490,94 @@ public abstract class AbstractAjpProcess } } + private void writeData(ByteChunk chunk) throws IOException { + // Prevent timeout + socketWrapper.access(); + + boolean blocking = (response.getWriteListener() == null); + if (!blocking) { + flushBufferedData(); + } + + int len = chunk.getLength(); + int off = 0; + + // Write this chunk + while (responseMsgPos == -1 && len > 0) { + int thisTime = len; + if (thisTime > outputMaxChunkSize) { + thisTime = outputMaxChunkSize; + } + responseMessage.reset(); + responseMessage.appendByte(Constants.JK_AJP13_SEND_BODY_CHUNK); + responseMessage.appendBytes(chunk.getBytes(), chunk.getOffset() + off, thisTime); + responseMessage.end(); + writeResponseMessage(blocking); + + len -= thisTime; + off += thisTime; + } + + bytesWritten += off; + + if (chunk.getLength() > 0) { + // Add this chunk to the buffer + addToBuffers(chunk.getBuffer(), off, len); + } + } + + + private void addToBuffers(byte[] buf, int offset, int length) { + ByteBufferHolder holder = bufferedWrites.peekLast(); + if (holder == null || holder.isFlipped() || holder.getBuf().remaining() < length) { + ByteBuffer buffer = ByteBuffer.allocate(Math.max(bufferedWriteSize,length)); + holder = new ByteBufferHolder(buffer, false); + bufferedWrites.add(holder); + } + holder.getBuf().put(buf, offset, length); + } + + + private void flushBufferedData() throws IOException { + + if (responseMsgPos > -1) { + // Must be using non-blocking IO + // Partially written response message. Try and complete it. + writeResponseMessage(false); + } + + while (responseMsgPos == -1 && bufferedWrites.size() > 0) { + // Try and write any remaining buffer data + Iterator<ByteBufferHolder> holders = bufferedWrites.iterator(); + ByteBufferHolder holder = holders.next(); + holder.flip(); + ByteBuffer buffer = holder.getBuf(); + int initialBufferSize = buffer.remaining(); + while (responseMsgPos == -1 && buffer.remaining() > 0) { + transferToResponseMsg(buffer); + writeResponseMessage(false); + } + bytesWritten += (initialBufferSize - buffer.remaining()); + if (buffer.remaining() == 0) { + holders.remove(); + } + } + } + + + private void transferToResponseMsg(ByteBuffer buffer) { + + int thisTime = buffer.remaining(); + if (thisTime > outputMaxChunkSize) { + thisTime = outputMaxChunkSize; + } + + responseMessage.reset(); + responseMessage.appendByte(Constants.JK_AJP13_SEND_BODY_CHUNK); + buffer.get(responseMessage.getBuffer(), responseMessage.pos, thisTime); + responseMessage.end(); + } + private void writeResponseMessage(boolean block) throws IOException { int len = responseMessage.getLen(); @@ -1551,25 +1659,7 @@ public abstract class AbstractAjpProcess } if (!swallowResponse) { - int len = chunk.getLength(); - // 4 - hardcoded, byte[] marshaling overhead - int off = 0; - while (len > 0) { - int thisTime = len; - if (thisTime > outputMaxChunkSize) { - thisTime = outputMaxChunkSize; - } - len -= thisTime; - responseMessage.reset(); - responseMessage.appendByte(Constants.JK_AJP13_SEND_BODY_CHUNK); - responseMessage.appendBytes(chunk.getBytes(), chunk.getOffset() + off, thisTime); - responseMessage.end(); - writeResponseMessage(true); - - off += thisTime; - } - - bytesWritten += chunk.getLength(); + writeData(chunk); } return chunk.getLength(); } @@ -1579,4 +1669,47 @@ public abstract class AbstractAjpProcess return bytesWritten; } } + + + protected static class ByteBufferHolder { + private final ByteBuffer buf; + private final AtomicBoolean flipped; + public ByteBufferHolder(ByteBuffer buf, boolean flipped) { + this.buf = buf; + this.flipped = new AtomicBoolean(flipped); + } + public ByteBuffer getBuf() { + return buf; + } + public boolean isFlipped() { + return flipped.get(); + } + + public boolean flip() { + if (flipped.compareAndSet(false, true)) { + buf.flip(); + return true; + } else { + return false; + } + } + + public boolean hasData() { + if (flipped.get()) { + return buf.remaining()>0; + } else { + return buf.position()>0; + } + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(super.toString()); + builder.append("[flipped="); + builder.append(isFlipped()?"true, remaining=" : "false, position="); + builder.append(isFlipped()? buf.remaining(): buf.position()); + builder.append("]"); + return builder.toString(); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org