Author: markt Date: Thu May 2 20:59:05 2013 New Revision: 1478542 URL: http://svn.apache.org/r1478542 Log: Copy buffering for non-blocking writes from NIO to APR and align code
Modified: tomcat/trunk/java/org/apache/coyote/http11/AbstractOutputBuffer.java tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java tomcat/trunk/java/org/apache/coyote/http11/InternalOutputBuffer.java Modified: tomcat/trunk/java/org/apache/coyote/http11/AbstractOutputBuffer.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/AbstractOutputBuffer.java?rev=1478542&r1=1478541&r2=1478542&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/AbstractOutputBuffer.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/AbstractOutputBuffer.java Thu May 2 20:59:05 2013 @@ -62,7 +62,7 @@ public abstract class AbstractOutputBuff /** * The buffer used for header composition. */ - protected byte[] buf; + protected byte[] headerBuffer; /** @@ -380,7 +380,7 @@ public abstract class AbstractOutputBuff // Write protocol name write(Constants.HTTP_11_BYTES); - buf[pos++] = Constants.SP; + headerBuffer[pos++] = Constants.SP; // Write status code int status = response.getStatus(); @@ -398,7 +398,7 @@ public abstract class AbstractOutputBuff write(status); } - buf[pos++] = Constants.SP; + headerBuffer[pos++] = Constants.SP; // Write message String message = null; @@ -418,15 +418,15 @@ public abstract class AbstractOutputBuff new PrivilegedAction<Void>(){ @Override public Void run(){ - buf[pos++] = Constants.CR; - buf[pos++] = Constants.LF; + headerBuffer[pos++] = Constants.CR; + headerBuffer[pos++] = Constants.LF; return null; } } ); } else { - buf[pos++] = Constants.CR; - buf[pos++] = Constants.LF; + headerBuffer[pos++] = Constants.CR; + headerBuffer[pos++] = Constants.LF; } } @@ -441,11 +441,11 @@ public abstract class AbstractOutputBuff public void sendHeader(MessageBytes name, MessageBytes value) { write(name); - buf[pos++] = Constants.COLON; - buf[pos++] = Constants.SP; + headerBuffer[pos++] = Constants.COLON; + headerBuffer[pos++] = Constants.SP; write(value); - buf[pos++] = Constants.CR; - buf[pos++] = Constants.LF; + headerBuffer[pos++] = Constants.CR; + headerBuffer[pos++] = Constants.LF; } @@ -455,8 +455,8 @@ public abstract class AbstractOutputBuff */ public void endHeaders() { - buf[pos++] = Constants.CR; - buf[pos++] = Constants.LF; + headerBuffer[pos++] = Constants.CR; + headerBuffer[pos++] = Constants.LF; } @@ -495,7 +495,7 @@ public abstract class AbstractOutputBuff // Writing the byte chunk to the output buffer int length = bc.getLength(); checkLengthBeforeWrite(length); - System.arraycopy(bc.getBytes(), bc.getStart(), buf, pos, length); + System.arraycopy(bc.getBytes(), bc.getStart(), headerBuffer, pos, length); pos = pos + length; } @@ -523,7 +523,7 @@ public abstract class AbstractOutputBuff if (((c <= 31) && (c != 9)) || c == 127 || c > 255) { c = ' '; } - buf[pos++] = (byte) c; + headerBuffer[pos++] = (byte) c; } } @@ -540,7 +540,7 @@ public abstract class AbstractOutputBuff checkLengthBeforeWrite(b.length); // Writing the byte chunk to the output buffer - System.arraycopy(b, 0, buf, pos, b.length); + System.arraycopy(b, 0, headerBuffer, pos, b.length); pos = pos + b.length; } @@ -570,7 +570,7 @@ public abstract class AbstractOutputBuff if (((c <= 31) && (c != 9)) || c == 127 || c > 255) { c = ' '; } - buf[pos++] = (byte) c; + headerBuffer[pos++] = (byte) c; } } @@ -595,7 +595,7 @@ public abstract class AbstractOutputBuff * requested number of bytes. */ private void checkLengthBeforeWrite(int length) { - if (pos + length > buf.length) { + if (pos + length > headerBuffer.length) { throw new HeadersTooLargeException( sm.getString("iob.responseheadertoolarge.error")); } Modified: tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java?rev=1478542&r1=1478541&r2=1478542&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java Thu May 2 20:59:05 2013 @@ -19,6 +19,7 @@ package org.apache.coyote.http11; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Iterator; import org.apache.coyote.OutputBuffer; import org.apache.coyote.Response; @@ -35,7 +36,6 @@ import org.apache.tomcat.util.net.Socket */ public class InternalAprOutputBuffer extends AbstractOutputBuffer<Long> { - // ----------------------------------------------------------- Constructors /** @@ -45,7 +45,7 @@ public class InternalAprOutputBuffer ext this.response = response; - buf = new byte[headerBufferSize]; + headerBuffer = new byte[headerBufferSize]; if (headerBufferSize < (8 * 1024)) { bbuf = ByteBuffer.allocateDirect(6 * 1500); } else { @@ -141,27 +141,108 @@ public class InternalAprOutputBuffer ext if (pos > 0) { // Sending the response header buffer - bbuf.put(buf, 0, pos); + bbuf.put(headerBuffer, 0, pos); + } + + } + + + private synchronized void addToBB(byte[] buf, int offset, int length) + throws IOException { + + if (length == 0) return; + + // Try to flush any data in the socket's write buffer first + boolean dataLeft = flushBuffer(isBlocking()); + + // Keep writing until all the data is written or a non-blocking write + // leaves data in the buffer + while (!dataLeft && length > 0) { + int thisTime = length; + if (bbuf.position() == bbuf.capacity()) { + flushBuffer(isBlocking()); + } + if (thisTime > bbuf.capacity() - bbuf.position()) { + thisTime = bbuf.capacity() - bbuf.position(); + } + bbuf.put(buf, offset, thisTime); + length = length - thisTime; + offset = offset + thisTime; + } + + // TODO: Review how to update the SocketWrapper's last accessed time + + if (!isBlocking() && length>0) { + // Buffer the remaining data + addToBuffers(buf, offset, length); } } + private void addToBuffers(byte[] buf, int offset, int length) { + ByteBufferHolder holder = bufferedWrites.peekLast(); + if (holder==null || holder.isFlipped() || holder.getBuf().remaining()<length) { + ByteBuffer buffer = ByteBuffer.allocate(Math.max(bufferedWriteSize,length)); + holder = new ByteBufferHolder(buffer,false); + bufferedWrites.add(holder); + } + holder.getBuf().put(buf,offset,length); + } + + /** * Callback to write data from the buffer. */ @Override protected boolean flushBuffer(boolean block) throws IOException { - // TODO: Non-blocking IO not yet implemented so always block parameter - // ignored - if (bbuf.position() > 0) { - if (Socket.sendbb(socket, 0, bbuf.position()) < 0) { - throw new IOException(); + + // TODO: Review how to update the SocketWrapper's last accessed time + + boolean dataLeft = hasMoreDataToFlush(); + + if (dataLeft) { + writeToSocket(); + } + + if (!dataLeft && bufferedWrites!=null) { + Iterator<ByteBufferHolder> bufIter = bufferedWrites.iterator(); + while (!hasMoreDataToFlush() && bufIter.hasNext()) { + ByteBufferHolder buffer = bufIter.next(); + buffer.flip(); + while (!hasMoreDataToFlush() && buffer.getBuf().remaining()>0) { + transfer(buffer.getBuf(), bbuf); + if (buffer.getBuf().remaining() == 0) { + bufIter.remove(); + } + writeToSocket(); + //here we must break if we didn't finish the write + } } - bbuf.clear(); } - // TODO: Non-blocking IO not yet implemented so always returns false - return false; + + dataLeft = hasMoreDataToFlush(); + + return hasMoreDataToFlush(); + } + + + private void writeToSocket() throws IOException { + // TODO Implement non-blocking writes + if (Socket.sendbb(socket, 0, bbuf.position()) < 0) { + throw new IOException(); + } + bbuf.clear(); + + } + + + private void transfer(ByteBuffer from, ByteBuffer to) { + int max = Math.min(from.remaining(), to.remaining()); + ByteBuffer tmp = from.duplicate (); + tmp.limit (tmp.position() + max); + to.put (tmp); + from.position(from.position() + max); } @@ -169,8 +250,7 @@ public class InternalAprOutputBuffer ext @Override protected boolean hasMoreDataToFlush() { - // TODO - return false; + return bbuf.position() > 0; } @@ -187,24 +267,12 @@ public class InternalAprOutputBuffer ext * Write chunk. */ @Override - public int doWrite(ByteChunk chunk, Response res) - throws IOException { + public int doWrite(ByteChunk chunk, Response res) throws IOException { int len = chunk.getLength(); int start = chunk.getStart(); byte[] b = chunk.getBuffer(); - while (len > 0) { - int thisTime = len; - if (bbuf.position() == bbuf.capacity()) { - flushBuffer(isBlocking()); - } - if (thisTime > bbuf.capacity() - bbuf.position()) { - thisTime = bbuf.capacity() - bbuf.position(); - } - bbuf.put(b, start, thisTime); - len = len - thisTime; - start = start + thisTime; - } + addToBB(b, start,len); byteCount += chunk.getLength(); return chunk.getLength(); } Modified: tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java?rev=1478542&r1=1478541&r2=1478542&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java Thu May 2 20:59:05 2013 @@ -50,7 +50,7 @@ public class InternalNioOutputBuffer ext this.response = response; - buf = new byte[headerBufferSize]; + headerBuffer = new byte[headerBufferSize]; outputStreamOutputBuffer = new SocketOutputBuffer(); @@ -185,18 +185,22 @@ public class InternalNioOutputBuffer ext if (pos > 0) { // Sending the response header buffer - addToBB(buf, 0, pos); + addToBB(headerBuffer, 0, pos); } } - private synchronized void addToBB(byte[] buf, int offset, int length) throws IOException { - //try to write to socket first - if (length==0) return; + private synchronized void addToBB(byte[] buf, int offset, int length) + throws IOException { + + if (length == 0) return; + // Try to flush any data in the socket's write buffer first boolean dataLeft = flushBuffer(isBlocking()); + // Keep writing until all the data is written or a non-blocking write + // leaves data in the buffer while (!dataLeft && length>0) { int thisTime = transfer(buf,offset,length,socket.getBufHandler().getWriteBuffer()); length = length - thisTime; @@ -215,6 +219,7 @@ public class InternalNioOutputBuffer ext } } + private void addToBuffers(byte[] buf, int offset, int length) { ByteBufferHolder holder = bufferedWrites.peekLast(); if (holder==null || holder.isFlipped() || holder.getBuf().remaining()<length) { @@ -264,9 +269,7 @@ public class InternalNioOutputBuffer ext } } - dataLeft = hasMoreDataToFlush(); - - return dataLeft; + return hasMoreDataToFlush(); } @Override @@ -281,13 +284,12 @@ public class InternalNioOutputBuffer ext return max; } - private int transfer(ByteBuffer from, ByteBuffer to) { + private void transfer(ByteBuffer from, ByteBuffer to) { int max = Math.min(from.remaining(), to.remaining()); ByteBuffer tmp = from.duplicate (); tmp.limit (tmp.position() + max); to.put (tmp); from.position(from.position() + max); - return max; } @@ -297,16 +299,14 @@ public class InternalNioOutputBuffer ext * This class is an output buffer which will write data to an output * stream. */ - protected class SocketOutputBuffer - implements OutputBuffer { + protected class SocketOutputBuffer implements OutputBuffer { /** * Write chunk. */ @Override - public int doWrite(ByteChunk chunk, Response res) - throws IOException { + public int doWrite(ByteChunk chunk, Response res) throws IOException { int len = chunk.getLength(); int start = chunk.getStart(); Modified: tomcat/trunk/java/org/apache/coyote/http11/InternalOutputBuffer.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalOutputBuffer.java?rev=1478542&r1=1478541&r2=1478542&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/InternalOutputBuffer.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/InternalOutputBuffer.java Thu May 2 20:59:05 2013 @@ -44,7 +44,7 @@ public class InternalOutputBuffer extend this.response = response; - buf = new byte[headerBufferSize]; + headerBuffer = new byte[headerBufferSize]; outputStreamOutputBuffer = new OutputStreamOutputBuffer(); @@ -161,9 +161,9 @@ public class InternalOutputBuffer extend if (pos > 0) { // Sending the response header buffer if (useSocketBuffer) { - socketBuffer.append(buf, 0, pos); + socketBuffer.append(headerBuffer, 0, pos); } else { - outputStream.write(buf, 0, pos); + outputStream.write(headerBuffer, 0, pos); } } @@ -186,7 +186,7 @@ public class InternalOutputBuffer extend @Override protected boolean hasMoreDataToFlush() { - // TODO + // Blocking IO so always returns false. return false; } --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org