Author: markt Date: Wed Aug 15 15:06:56 2018 New Revision: 1838104 URL: http://svn.apache.org/viewvc?rev=1838104&view=rev Log: Fix https://bz.apache.org/bugzilla/show_bug.cgi?id=62620 Fix corruption of response bodies when writing large bodies using asynchronous processing over HTTP/2.
Added: tomcat/trunk/java/org/apache/tomcat/util/net/WriteBuffer.java (with props) Modified: tomcat/trunk/java/org/apache/coyote/http2/Stream.java tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java tomcat/trunk/webapps/docs/changelog.xml Modified: tomcat/trunk/java/org/apache/coyote/http2/Stream.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/Stream.java?rev=1838104&r1=1838103&r2=1838104&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http2/Stream.java (original) +++ tomcat/trunk/java/org/apache/coyote/http2/Stream.java Wed Aug 15 15:06:56 2018 @@ -43,6 +43,7 @@ import org.apache.tomcat.util.buf.Messag import org.apache.tomcat.util.http.MimeHeaders; import org.apache.tomcat.util.http.parser.Host; import org.apache.tomcat.util.net.ApplicationBufferHandler; +import org.apache.tomcat.util.net.WriteBuffer; import org.apache.tomcat.util.res.StringManager; class Stream extends AbstractStream implements HeaderEmitter { @@ -712,9 +713,10 @@ class Stream extends AbstractStream impl } - class StreamOutputBuffer implements HttpOutputBuffer { + class StreamOutputBuffer implements HttpOutputBuffer, WriteBuffer.Sink { private final ByteBuffer buffer = ByteBuffer.allocate(8 * 1024); + private final WriteBuffer writeBuffer = new WriteBuffer(32 * 1024); private volatile long written = 0; private volatile boolean closed = false; private volatile boolean endOfStreamSent = false; @@ -742,6 +744,7 @@ class Stream extends AbstractStream impl // Only flush if we have more data to write and the buffer // is full if (flush(true, coyoteResponse.getWriteListener() == null)) { + writeBuffer.add(chunk); break; } } @@ -751,7 +754,33 @@ class Stream extends AbstractStream impl } final synchronized boolean flush(boolean block) throws IOException { - return flush(false, block); + /* + * Need to ensure that there is exactly one call to flush even when + * there is no data to write. + * Too few calls (i.e. zero) and the end of stream message is not + * sent for a completed asynchronous write. + * Too many calls and the end of stream message is sent too soon and + * trailer headers are not sent. + */ + boolean dataLeft = buffer.position() > 0; + boolean flushed = false; + + if (dataLeft) { + dataLeft = flush(false, block); + flushed = true; + } + + if (!dataLeft) { + if (writeBuffer.isEmpty()) { + if (!flushed) { + dataLeft = flush(false, block); + } + } else { + dataLeft = writeBuffer.write(this, block); + } + } + + return dataLeft; } private final synchronized boolean flush(boolean writeInProgress, boolean block) @@ -827,6 +856,23 @@ class Stream extends AbstractStream impl public void flush() throws IOException { flush(true); } + + @Override + public boolean writeFromBuffer(ByteBuffer src, boolean blocking) throws IOException { + int chunkLimit = src.limit(); + int offset = 0; + while (src.remaining() > 0) { + int thisTime = Math.min(buffer.remaining(), src.remaining()); + src.limit(src.position() + thisTime); + buffer.put(src); + src.limit(chunkLimit); + written += offset; + if (flush(true, blocking)) { + return true; + } + } + return false; + } } Modified: tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java?rev=1838104&r1=1838103&r2=1838104&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java Wed Aug 15 15:06:56 2018 @@ -48,7 +48,6 @@ import javax.net.ssl.SSLSession; import org.apache.juli.logging.Log; import org.apache.juli.logging.LogFactory; import org.apache.tomcat.util.ExceptionUtils; -import org.apache.tomcat.util.buf.ByteBufferHolder; import org.apache.tomcat.util.collections.SynchronizedStack; import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState; import org.apache.tomcat.util.net.jsse.JSSESupport; @@ -575,19 +574,14 @@ public class Nio2Endpoint extends Abstra synchronized (writeCompletionHandler) { if (nBytes.intValue() < 0) { failed(new EOFException(sm.getString("iob.failedwrite")), attachment); - } else if (bufferedWrites.size() > 0) { + } else if (!writeBuffer.isEmpty()) { nestedWriteCompletionCount.get().incrementAndGet(); // Continue writing data using a gathering write List<ByteBuffer> arrayList = new ArrayList<>(); if (attachment.hasRemaining()) { arrayList.add(attachment); } - for (ByteBufferHolder buffer : bufferedWrites) { - buffer.flip(); - arrayList.add(buffer.getBuf()); - } - bufferedWrites.clear(); - ByteBuffer[] array = arrayList.toArray(new ByteBuffer[arrayList.size()]); + ByteBuffer[] array = writeBuffer.transferToListAsArray(arrayList); getSocket().write(array, 0, array.length, toNio2Timeout(getWriteTimeout()), TimeUnit.MILLISECONDS, array, gatheringWriteCompletionHandler); @@ -633,7 +627,7 @@ public class Nio2Endpoint extends Abstra synchronized (writeCompletionHandler) { if (nBytes.longValue() < 0) { failed(new EOFException(sm.getString("iob.failedwrite")), attachment); - } else if (bufferedWrites.size() > 0 || arrayHasData(attachment)) { + } else if (!writeBuffer.isEmpty() || arrayHasData(attachment)) { // Continue writing data nestedWriteCompletionCount.get().incrementAndGet(); List<ByteBuffer> arrayList = new ArrayList<>(); @@ -642,12 +636,7 @@ public class Nio2Endpoint extends Abstra arrayList.add(buffer); } } - for (ByteBufferHolder buffer : bufferedWrites) { - buffer.flip(); - arrayList.add(buffer.getBuf()); - } - bufferedWrites.clear(); - ByteBuffer[] array = arrayList.toArray(new ByteBuffer[arrayList.size()]); + ByteBuffer[] array = writeBuffer.transferToListAsArray(arrayList); getSocket().write(array, 0, array.length, toNio2Timeout(getWriteTimeout()), TimeUnit.MILLISECONDS, array, gatheringWriteCompletionHandler); @@ -1187,11 +1176,11 @@ public class Nio2Endpoint extends Abstra off = off + thisTime; if (len > 0) { // Remaining data must be buffered - addToBuffers(buf, off, len); + writeBuffer.add(buf, off, len); } flushNonBlocking(true); } else { - addToBuffers(buf, off, len); + writeBuffer.add(buf, off, len); } } } @@ -1222,11 +1211,11 @@ public class Nio2Endpoint extends Abstra transfer(from, socketBufferHandler.getWriteBuffer()); if (from.remaining() > 0) { // Remaining data must be buffered - addToBuffers(from); + writeBuffer.add(from); } flushNonBlocking(true); } else { - addToBuffers(from); + writeBuffer.add(from); } } } @@ -1297,18 +1286,13 @@ public class Nio2Endpoint extends Abstra synchronized (writeCompletionHandler) { if (hasPermit || writePending.tryAcquire()) { socketBufferHandler.configureWriteBufferForRead(); - if (bufferedWrites.size() > 0) { + if (!writeBuffer.isEmpty()) { // Gathering write of the main buffer plus all leftovers List<ByteBuffer> arrayList = new ArrayList<>(); if (socketBufferHandler.getWriteBuffer().hasRemaining()) { arrayList.add(socketBufferHandler.getWriteBuffer()); } - for (ByteBufferHolder buffer : bufferedWrites) { - buffer.flip(); - arrayList.add(buffer.getBuf()); - } - bufferedWrites.clear(); - ByteBuffer[] array = arrayList.toArray(new ByteBuffer[arrayList.size()]); + ByteBuffer[] array = writeBuffer.transferToListAsArray(arrayList); Nio2Endpoint.startInline(); getSocket().write(array, 0, array.length, toNio2Timeout(getWriteTimeout()), TimeUnit.MILLISECONDS, array, gatheringWriteCompletionHandler); @@ -1336,7 +1320,7 @@ public class Nio2Endpoint extends Abstra public boolean hasDataToWrite() { synchronized (writeCompletionHandler) { return !socketBufferHandler.isWriteBufferEmpty() || - bufferedWrites.size() > 0 || getError() != null; + !writeBuffer.isEmpty() || getError() != null; } } Modified: tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java?rev=1838104&r1=1838103&r2=1838104&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java Wed Aug 15 15:06:56 2018 @@ -19,9 +19,7 @@ package org.apache.tomcat.util.net; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.CompletionHandler; -import java.util.Iterator; import java.util.concurrent.Executor; -import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; @@ -30,7 +28,6 @@ import java.util.concurrent.locks.Reentr import org.apache.juli.logging.Log; import org.apache.juli.logging.LogFactory; -import org.apache.tomcat.util.buf.ByteBufferHolder; import org.apache.tomcat.util.res.StringManager; public abstract class SocketWrapperBase<E> { @@ -80,17 +77,17 @@ public abstract class SocketWrapperBase< protected volatile SocketBufferHandler socketBufferHandler = null; /** + * The max size of the individual buffered write buffers + */ + protected int bufferedWriteSize = 64 * 1024; // 64k default write buffer + + /** * For "non-blocking" writes use an external set of buffers. Although the * API only allows one non-blocking write at a time, due to buffering and * the possible need to write HTTP headers, there may be more than one write * to the OutputBuffer. */ - protected final LinkedBlockingDeque<ByteBufferHolder> bufferedWrites = new LinkedBlockingDeque<>(); - - /** - * The max size of the buffered write buffer - */ - protected int bufferedWriteSize = 64 * 1024; // 64k default write buffer + protected final WriteBuffer writeBuffer = new WriteBuffer(bufferedWriteSize); public SocketWrapperBase(E socket, AbstractEndpoint<E,?> endpoint) { this.socket = socket; @@ -252,7 +249,7 @@ public abstract class SocketWrapperBase< public SocketBufferHandler getSocketBufferHandler() { return socketBufferHandler; } public boolean hasDataToWrite() { - return !socketBufferHandler.isWriteBufferEmpty() || bufferedWrites.size() > 0; + return !socketBufferHandler.isWriteBufferEmpty() || !writeBuffer.isEmpty(); } /** @@ -282,7 +279,7 @@ public abstract class SocketWrapperBase< if (socketBufferHandler == null) { throw new IllegalStateException(sm.getString("socket.closed")); } - return socketBufferHandler.isWriteBufferWritable() && bufferedWrites.size() == 0; + return socketBufferHandler.isWriteBufferWritable() && writeBuffer.isEmpty(); } @@ -505,7 +502,7 @@ public abstract class SocketWrapperBase< * @throws IOException If an IO error occurs during the write */ protected void writeNonBlocking(byte[] buf, int off, int len) throws IOException { - if (bufferedWrites.size() == 0 && socketBufferHandler.isWriteBufferWritable()) { + if (writeBuffer.isEmpty() && socketBufferHandler.isWriteBufferWritable()) { socketBufferHandler.configureWriteBufferForWrite(); int thisTime = transfer(buf, off, len, socketBufferHandler.getWriteBuffer()); len = len - thisTime; @@ -527,7 +524,7 @@ public abstract class SocketWrapperBase< if (len > 0) { // Remaining data must be buffered - addToBuffers(buf, off, len); + writeBuffer.add(buf, off, len); } } @@ -544,18 +541,18 @@ public abstract class SocketWrapperBase< * @throws IOException If an IO error occurs during the write */ protected void writeNonBlocking(ByteBuffer from) throws IOException { - if (bufferedWrites.size() == 0 && socketBufferHandler.isWriteBufferWritable()) { + if (writeBuffer.isEmpty() && socketBufferHandler.isWriteBufferWritable()) { writeNonBlockingInternal(from); } if (from.remaining() > 0) { // Remaining data must be buffered - addToBuffers(from); + writeBuffer.add(from); } } - private boolean writeNonBlockingInternal(ByteBuffer from) throws IOException { + boolean writeNonBlockingInternal(ByteBuffer from) throws IOException { if (socketBufferHandler.isWriteBufferEmpty()) { return writeByteBufferNonBlocking(from); } else { @@ -627,16 +624,8 @@ public abstract class SocketWrapperBase< protected void flushBlocking() throws IOException { doWrite(true); - if (bufferedWrites.size() > 0) { - Iterator<ByteBufferHolder> bufIter = bufferedWrites.iterator(); - while (bufIter.hasNext()) { - ByteBufferHolder buffer = bufIter.next(); - buffer.flip(); - writeBlocking(buffer.getBuf()); - if (buffer.getBuf().remaining() == 0) { - bufIter.remove(); - } - } + if (!writeBuffer.isEmpty()) { + writeBuffer.write(this, true); if (!socketBufferHandler.isWriteBufferEmpty()) { doWrite(true); @@ -655,16 +644,8 @@ public abstract class SocketWrapperBase< dataLeft = !socketBufferHandler.isWriteBufferEmpty(); } - if (!dataLeft && bufferedWrites.size() > 0) { - Iterator<ByteBufferHolder> bufIter = bufferedWrites.iterator(); - while (!dataLeft && bufIter.hasNext()) { - ByteBufferHolder buffer = bufIter.next(); - buffer.flip(); - dataLeft = writeNonBlockingInternal(buffer.getBuf()); - if (buffer.getBuf().remaining() == 0) { - bufIter.remove(); - } - } + if (!dataLeft && !writeBuffer.isEmpty()) { + dataLeft = writeBuffer.write(this, false); if (!dataLeft && !socketBufferHandler.isWriteBufferEmpty()) { doWrite(false); @@ -706,29 +687,6 @@ public abstract class SocketWrapperBase< protected abstract void doWrite(boolean block, ByteBuffer from) throws IOException; - protected void addToBuffers(byte[] buf, int offset, int length) { - ByteBufferHolder holder = getByteBufferHolder(length); - holder.getBuf().put(buf, offset, length); - } - - - protected void addToBuffers(ByteBuffer from) { - ByteBufferHolder holder = getByteBufferHolder(from.remaining()); - holder.getBuf().put(from); - } - - - private ByteBufferHolder getByteBufferHolder(int capacity) { - ByteBufferHolder holder = bufferedWrites.peekLast(); - if (holder == null || holder.isFlipped() || holder.getBuf().remaining() < capacity) { - ByteBuffer buffer = ByteBuffer.allocate(Math.max(bufferedWriteSize, capacity)); - holder = new ByteBufferHolder(buffer, false); - bufferedWrites.add(holder); - } - return holder; - } - - public void processSocket(SocketEvent socketStatus, boolean dispatch) { endpoint.processSocket(this, socketStatus, dispatch); } Added: tomcat/trunk/java/org/apache/tomcat/util/net/WriteBuffer.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/WriteBuffer.java?rev=1838104&view=auto ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/WriteBuffer.java (added) +++ tomcat/trunk/java/org/apache/tomcat/util/net/WriteBuffer.java Wed Aug 15 15:06:56 2018 @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tomcat.util.net; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.LinkedBlockingDeque; + +import org.apache.tomcat.util.buf.ByteBufferHolder; + +/** + * Provides an expandable set of buffers for writes. Non-blocking writes can be + * of any size and may not be able to be written immediately or wholly contained + * in the buffer used to perform the writes to the next layer. This class + * provides a buffering capability to allow such writes to return immediately + * and also allows for the user provided buffers to be re-used / recycled as + * required. + */ +public class WriteBuffer { + + private final int bufferSize; + + private final LinkedBlockingDeque<ByteBufferHolder> buffers = new LinkedBlockingDeque<>(); + + public WriteBuffer(int bufferSize) { + this.bufferSize = bufferSize; + } + + + void add(byte[] buf, int offset, int length) { + ByteBufferHolder holder = getByteBufferHolder(length); + holder.getBuf().put(buf, offset, length); + } + + + public void add(ByteBuffer from) { + ByteBufferHolder holder = getByteBufferHolder(from.remaining()); + holder.getBuf().put(from); + } + + + private ByteBufferHolder getByteBufferHolder(int capacity) { + ByteBufferHolder holder = buffers.peekLast(); + if (holder == null || holder.isFlipped() || holder.getBuf().remaining() < capacity) { + ByteBuffer buffer = ByteBuffer.allocate(Math.max(bufferSize, capacity)); + holder = new ByteBufferHolder(buffer, false); + buffers.add(holder); + } + return holder; + } + + + public boolean isEmpty() { + return buffers.isEmpty(); + } + + + ByteBuffer[] transferToListAsArray(List<ByteBuffer> target) { + for (ByteBufferHolder buffer : buffers) { + buffer.flip(); + target.add(buffer.getBuf()); + } + buffers.clear(); + return target.toArray(new ByteBuffer[target.size()]); + } + + + boolean write(SocketWrapperBase<?> socketWrapper, boolean blocking) throws IOException { + Iterator<ByteBufferHolder> bufIter = buffers.iterator(); + boolean dataLeft = false; + while (!dataLeft && bufIter.hasNext()) { + ByteBufferHolder buffer = bufIter.next(); + buffer.flip(); + if (blocking) { + socketWrapper.writeBlocking(buffer.getBuf()); + } else { + dataLeft = socketWrapper.writeNonBlockingInternal(buffer.getBuf()); + } + if (buffer.getBuf().remaining() == 0) { + bufIter.remove(); + } + } + return dataLeft; + } + + + public boolean write(Sink sink, boolean blocking) throws IOException { + Iterator<ByteBufferHolder> bufIter = buffers.iterator(); + boolean dataLeft = false; + while (!dataLeft && bufIter.hasNext()) { + ByteBufferHolder buffer = bufIter.next(); + buffer.flip(); + dataLeft = sink.writeFromBuffer(buffer.getBuf(), blocking); + if (!dataLeft) { + bufIter.remove(); + } + } + return dataLeft; + } + + + /** + * Interface implemented by clients of the WriteBuffer to enable data to be + * written back out from the buffer. + */ + public interface Sink { + boolean writeFromBuffer(ByteBuffer buffer, boolean block) throws IOException; + } +} Propchange: tomcat/trunk/java/org/apache/tomcat/util/net/WriteBuffer.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: tomcat/trunk/webapps/docs/changelog.xml URL: http://svn.apache.org/viewvc/tomcat/trunk/webapps/docs/changelog.xml?rev=1838104&r1=1838103&r2=1838104&view=diff ============================================================================== --- tomcat/trunk/webapps/docs/changelog.xml (original) +++ tomcat/trunk/webapps/docs/changelog.xml Wed Aug 15 15:06:56 2018 @@ -51,6 +51,10 @@ Fix potential deadlocks when using asynchronous Servlet processing with HTTP/2 connectors. (markt) </fix> + <fix> + <bug>62620</bug>: Fix corruption of response bodies when writing large + bodies using asynchronous processing over HTTP/2. (markt) + </fix> </changelog> </subsection> <subsection name="Other"> --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org