Author: markt
Date: Wed Aug 15 15:06:56 2018
New Revision: 1838104

URL: http://svn.apache.org/viewvc?rev=1838104&view=rev
Log:
Fix https://bz.apache.org/bugzilla/show_bug.cgi?id=62620
Fix corruption of response bodies when writing large bodies using asynchronous 
processing over HTTP/2.

Added:
    tomcat/trunk/java/org/apache/tomcat/util/net/WriteBuffer.java   (with props)
Modified:
    tomcat/trunk/java/org/apache/coyote/http2/Stream.java
    tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java
    tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java
    tomcat/trunk/webapps/docs/changelog.xml

Modified: tomcat/trunk/java/org/apache/coyote/http2/Stream.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/Stream.java?rev=1838104&r1=1838103&r2=1838104&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http2/Stream.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http2/Stream.java Wed Aug 15 15:06:56 
2018
@@ -43,6 +43,7 @@ import org.apache.tomcat.util.buf.Messag
 import org.apache.tomcat.util.http.MimeHeaders;
 import org.apache.tomcat.util.http.parser.Host;
 import org.apache.tomcat.util.net.ApplicationBufferHandler;
+import org.apache.tomcat.util.net.WriteBuffer;
 import org.apache.tomcat.util.res.StringManager;
 
 class Stream extends AbstractStream implements HeaderEmitter {
@@ -712,9 +713,10 @@ class Stream extends AbstractStream impl
     }
 
 
-    class StreamOutputBuffer implements HttpOutputBuffer {
+    class StreamOutputBuffer implements HttpOutputBuffer, WriteBuffer.Sink {
 
         private final ByteBuffer buffer = ByteBuffer.allocate(8 * 1024);
+        private final WriteBuffer writeBuffer = new WriteBuffer(32 * 1024);
         private volatile long written = 0;
         private volatile boolean closed = false;
         private volatile boolean endOfStreamSent = false;
@@ -742,6 +744,7 @@ class Stream extends AbstractStream impl
                     // Only flush if we have more data to write and the buffer
                     // is full
                     if (flush(true, coyoteResponse.getWriteListener() == 
null)) {
+                        writeBuffer.add(chunk);
                         break;
                     }
                 }
@@ -751,7 +754,33 @@ class Stream extends AbstractStream impl
         }
 
         final synchronized boolean flush(boolean block) throws IOException {
-            return flush(false, block);
+            /*
+             * Need to ensure that there is exactly one call to flush even when
+             * there is no data to write.
+             * Too few calls (i.e. zero) and the end of stream message is not
+             * sent for a completed asynchronous write.
+             * Too many calls and the end of stream message is sent too soon 
and
+             * trailer headers are not sent.
+             */
+            boolean dataLeft = buffer.position() > 0;
+            boolean flushed = false;
+
+            if (dataLeft) {
+                dataLeft = flush(false, block);
+                flushed = true;
+            }
+
+            if (!dataLeft) {
+                if (writeBuffer.isEmpty()) {
+                    if (!flushed) {
+                        dataLeft = flush(false, block);
+                    }
+                } else {
+                    dataLeft = writeBuffer.write(this, block);
+                }
+            }
+
+            return dataLeft;
         }
 
         private final synchronized boolean flush(boolean writeInProgress, 
boolean block)
@@ -827,6 +856,23 @@ class Stream extends AbstractStream impl
         public void flush() throws IOException {
             flush(true);
         }
+
+        @Override
+        public boolean writeFromBuffer(ByteBuffer src, boolean blocking) 
throws IOException {
+            int chunkLimit = src.limit();
+            int offset = 0;
+            while (src.remaining() > 0) {
+                int thisTime = Math.min(buffer.remaining(), src.remaining());
+                src.limit(src.position() + thisTime);
+                buffer.put(src);
+                src.limit(chunkLimit);
+                written += offset;
+                if (flush(true, blocking)) {
+                    return true;
+                }
+            }
+            return false;
+        }
     }
 
 

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java?rev=1838104&r1=1838103&r2=1838104&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java Wed Aug 15 
15:06:56 2018
@@ -48,7 +48,6 @@ import javax.net.ssl.SSLSession;
 import org.apache.juli.logging.Log;
 import org.apache.juli.logging.LogFactory;
 import org.apache.tomcat.util.ExceptionUtils;
-import org.apache.tomcat.util.buf.ByteBufferHolder;
 import org.apache.tomcat.util.collections.SynchronizedStack;
 import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
 import org.apache.tomcat.util.net.jsse.JSSESupport;
@@ -575,19 +574,14 @@ public class Nio2Endpoint extends Abstra
                     synchronized (writeCompletionHandler) {
                         if (nBytes.intValue() < 0) {
                             failed(new 
EOFException(sm.getString("iob.failedwrite")), attachment);
-                        } else if (bufferedWrites.size() > 0) {
+                        } else if (!writeBuffer.isEmpty()) {
                             nestedWriteCompletionCount.get().incrementAndGet();
                             // Continue writing data using a gathering write
                             List<ByteBuffer> arrayList = new ArrayList<>();
                             if (attachment.hasRemaining()) {
                                 arrayList.add(attachment);
                             }
-                            for (ByteBufferHolder buffer : bufferedWrites) {
-                                buffer.flip();
-                                arrayList.add(buffer.getBuf());
-                            }
-                            bufferedWrites.clear();
-                            ByteBuffer[] array = arrayList.toArray(new 
ByteBuffer[arrayList.size()]);
+                            ByteBuffer[] array = 
writeBuffer.transferToListAsArray(arrayList);
                             getSocket().write(array, 0, array.length,
                                     toNio2Timeout(getWriteTimeout()), 
TimeUnit.MILLISECONDS,
                                     array, gatheringWriteCompletionHandler);
@@ -633,7 +627,7 @@ public class Nio2Endpoint extends Abstra
                     synchronized (writeCompletionHandler) {
                         if (nBytes.longValue() < 0) {
                             failed(new 
EOFException(sm.getString("iob.failedwrite")), attachment);
-                        } else if (bufferedWrites.size() > 0 || 
arrayHasData(attachment)) {
+                        } else if (!writeBuffer.isEmpty() || 
arrayHasData(attachment)) {
                             // Continue writing data
                             nestedWriteCompletionCount.get().incrementAndGet();
                             List<ByteBuffer> arrayList = new ArrayList<>();
@@ -642,12 +636,7 @@ public class Nio2Endpoint extends Abstra
                                     arrayList.add(buffer);
                                 }
                             }
-                            for (ByteBufferHolder buffer : bufferedWrites) {
-                                buffer.flip();
-                                arrayList.add(buffer.getBuf());
-                            }
-                            bufferedWrites.clear();
-                            ByteBuffer[] array = arrayList.toArray(new 
ByteBuffer[arrayList.size()]);
+                            ByteBuffer[] array = 
writeBuffer.transferToListAsArray(arrayList);
                             getSocket().write(array, 0, array.length,
                                     toNio2Timeout(getWriteTimeout()), 
TimeUnit.MILLISECONDS,
                                     array, gatheringWriteCompletionHandler);
@@ -1187,11 +1176,11 @@ public class Nio2Endpoint extends Abstra
                     off = off + thisTime;
                     if (len > 0) {
                         // Remaining data must be buffered
-                        addToBuffers(buf, off, len);
+                        writeBuffer.add(buf, off, len);
                     }
                     flushNonBlocking(true);
                 } else {
-                    addToBuffers(buf, off, len);
+                    writeBuffer.add(buf, off, len);
                 }
             }
         }
@@ -1222,11 +1211,11 @@ public class Nio2Endpoint extends Abstra
                     transfer(from, socketBufferHandler.getWriteBuffer());
                     if (from.remaining() > 0) {
                         // Remaining data must be buffered
-                        addToBuffers(from);
+                        writeBuffer.add(from);
                     }
                     flushNonBlocking(true);
                 } else {
-                    addToBuffers(from);
+                    writeBuffer.add(from);
                 }
             }
         }
@@ -1297,18 +1286,13 @@ public class Nio2Endpoint extends Abstra
             synchronized (writeCompletionHandler) {
                 if (hasPermit || writePending.tryAcquire()) {
                     socketBufferHandler.configureWriteBufferForRead();
-                    if (bufferedWrites.size() > 0) {
+                    if (!writeBuffer.isEmpty()) {
                         // Gathering write of the main buffer plus all 
leftovers
                         List<ByteBuffer> arrayList = new ArrayList<>();
                         if 
(socketBufferHandler.getWriteBuffer().hasRemaining()) {
                             
arrayList.add(socketBufferHandler.getWriteBuffer());
                         }
-                        for (ByteBufferHolder buffer : bufferedWrites) {
-                            buffer.flip();
-                            arrayList.add(buffer.getBuf());
-                        }
-                        bufferedWrites.clear();
-                        ByteBuffer[] array = arrayList.toArray(new 
ByteBuffer[arrayList.size()]);
+                        ByteBuffer[] array = 
writeBuffer.transferToListAsArray(arrayList);
                         Nio2Endpoint.startInline();
                         getSocket().write(array, 0, array.length, 
toNio2Timeout(getWriteTimeout()),
                                 TimeUnit.MILLISECONDS, array, 
gatheringWriteCompletionHandler);
@@ -1336,7 +1320,7 @@ public class Nio2Endpoint extends Abstra
         public boolean hasDataToWrite() {
             synchronized (writeCompletionHandler) {
                 return !socketBufferHandler.isWriteBufferEmpty() ||
-                        bufferedWrites.size() > 0 || getError() != null;
+                        !writeBuffer.isEmpty() || getError() != null;
             }
         }
 

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java?rev=1838104&r1=1838103&r2=1838104&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java 
(original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java Wed Aug 
15 15:06:56 2018
@@ -19,9 +19,7 @@ package org.apache.tomcat.util.net;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.CompletionHandler;
-import java.util.Iterator;
 import java.util.concurrent.Executor;
-import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
@@ -30,7 +28,6 @@ import java.util.concurrent.locks.Reentr
 
 import org.apache.juli.logging.Log;
 import org.apache.juli.logging.LogFactory;
-import org.apache.tomcat.util.buf.ByteBufferHolder;
 import org.apache.tomcat.util.res.StringManager;
 
 public abstract class SocketWrapperBase<E> {
@@ -80,17 +77,17 @@ public abstract class SocketWrapperBase<
     protected volatile SocketBufferHandler socketBufferHandler = null;
 
     /**
+     * The max size of the individual buffered write buffers
+     */
+    protected int bufferedWriteSize = 64 * 1024; // 64k default write buffer
+
+    /**
      * For "non-blocking" writes use an external set of buffers. Although the
      * API only allows one non-blocking write at a time, due to buffering and
      * the possible need to write HTTP headers, there may be more than one 
write
      * to the OutputBuffer.
      */
-    protected final LinkedBlockingDeque<ByteBufferHolder> bufferedWrites = new 
LinkedBlockingDeque<>();
-
-    /**
-     * The max size of the buffered write buffer
-     */
-    protected int bufferedWriteSize = 64 * 1024; // 64k default write buffer
+    protected final WriteBuffer writeBuffer = new 
WriteBuffer(bufferedWriteSize);
 
     public SocketWrapperBase(E socket, AbstractEndpoint<E,?> endpoint) {
         this.socket = socket;
@@ -252,7 +249,7 @@ public abstract class SocketWrapperBase<
     public SocketBufferHandler getSocketBufferHandler() { return 
socketBufferHandler; }
 
     public boolean hasDataToWrite() {
-        return !socketBufferHandler.isWriteBufferEmpty() || 
bufferedWrites.size() > 0;
+        return !socketBufferHandler.isWriteBufferEmpty() || 
!writeBuffer.isEmpty();
     }
 
     /**
@@ -282,7 +279,7 @@ public abstract class SocketWrapperBase<
         if (socketBufferHandler == null) {
             throw new IllegalStateException(sm.getString("socket.closed"));
         }
-        return socketBufferHandler.isWriteBufferWritable() && 
bufferedWrites.size() == 0;
+        return socketBufferHandler.isWriteBufferWritable() && 
writeBuffer.isEmpty();
     }
 
 
@@ -505,7 +502,7 @@ public abstract class SocketWrapperBase<
      * @throws IOException If an IO error occurs during the write
      */
     protected void writeNonBlocking(byte[] buf, int off, int len) throws 
IOException {
-        if (bufferedWrites.size() == 0 && 
socketBufferHandler.isWriteBufferWritable()) {
+        if (writeBuffer.isEmpty() && 
socketBufferHandler.isWriteBufferWritable()) {
             socketBufferHandler.configureWriteBufferForWrite();
             int thisTime = transfer(buf, off, len, 
socketBufferHandler.getWriteBuffer());
             len = len - thisTime;
@@ -527,7 +524,7 @@ public abstract class SocketWrapperBase<
 
         if (len > 0) {
             // Remaining data must be buffered
-            addToBuffers(buf, off, len);
+            writeBuffer.add(buf, off, len);
         }
     }
 
@@ -544,18 +541,18 @@ public abstract class SocketWrapperBase<
      * @throws IOException If an IO error occurs during the write
      */
     protected void writeNonBlocking(ByteBuffer from) throws IOException {
-        if (bufferedWrites.size() == 0 && 
socketBufferHandler.isWriteBufferWritable()) {
+        if (writeBuffer.isEmpty() && 
socketBufferHandler.isWriteBufferWritable()) {
             writeNonBlockingInternal(from);
         }
 
         if (from.remaining() > 0) {
             // Remaining data must be buffered
-            addToBuffers(from);
+            writeBuffer.add(from);
         }
     }
 
 
-    private boolean writeNonBlockingInternal(ByteBuffer from) throws 
IOException {
+    boolean writeNonBlockingInternal(ByteBuffer from) throws IOException {
         if (socketBufferHandler.isWriteBufferEmpty()) {
             return writeByteBufferNonBlocking(from);
         } else {
@@ -627,16 +624,8 @@ public abstract class SocketWrapperBase<
     protected void flushBlocking() throws IOException {
         doWrite(true);
 
-        if (bufferedWrites.size() > 0) {
-            Iterator<ByteBufferHolder> bufIter = bufferedWrites.iterator();
-            while (bufIter.hasNext()) {
-                ByteBufferHolder buffer = bufIter.next();
-                buffer.flip();
-                writeBlocking(buffer.getBuf());
-                if (buffer.getBuf().remaining() == 0) {
-                    bufIter.remove();
-                }
-            }
+        if (!writeBuffer.isEmpty()) {
+            writeBuffer.write(this, true);
 
             if (!socketBufferHandler.isWriteBufferEmpty()) {
                 doWrite(true);
@@ -655,16 +644,8 @@ public abstract class SocketWrapperBase<
             dataLeft = !socketBufferHandler.isWriteBufferEmpty();
         }
 
-        if (!dataLeft && bufferedWrites.size() > 0) {
-            Iterator<ByteBufferHolder> bufIter = bufferedWrites.iterator();
-            while (!dataLeft && bufIter.hasNext()) {
-                ByteBufferHolder buffer = bufIter.next();
-                buffer.flip();
-                dataLeft = writeNonBlockingInternal(buffer.getBuf());
-                if (buffer.getBuf().remaining() == 0) {
-                    bufIter.remove();
-                }
-            }
+        if (!dataLeft && !writeBuffer.isEmpty()) {
+            dataLeft = writeBuffer.write(this, false);
 
             if (!dataLeft && !socketBufferHandler.isWriteBufferEmpty()) {
                 doWrite(false);
@@ -706,29 +687,6 @@ public abstract class SocketWrapperBase<
     protected abstract void doWrite(boolean block, ByteBuffer from) throws 
IOException;
 
 
-    protected void addToBuffers(byte[] buf, int offset, int length) {
-        ByteBufferHolder holder = getByteBufferHolder(length);
-        holder.getBuf().put(buf, offset, length);
-    }
-
-
-    protected void addToBuffers(ByteBuffer from) {
-        ByteBufferHolder holder = getByteBufferHolder(from.remaining());
-        holder.getBuf().put(from);
-    }
-
-
-    private ByteBufferHolder getByteBufferHolder(int capacity) {
-        ByteBufferHolder holder = bufferedWrites.peekLast();
-        if (holder == null || holder.isFlipped() || 
holder.getBuf().remaining() < capacity) {
-            ByteBuffer buffer = 
ByteBuffer.allocate(Math.max(bufferedWriteSize, capacity));
-            holder = new ByteBufferHolder(buffer, false);
-            bufferedWrites.add(holder);
-        }
-        return holder;
-    }
-
-
     public void processSocket(SocketEvent socketStatus, boolean dispatch) {
         endpoint.processSocket(this, socketStatus, dispatch);
     }

Added: tomcat/trunk/java/org/apache/tomcat/util/net/WriteBuffer.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/WriteBuffer.java?rev=1838104&view=auto
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/WriteBuffer.java (added)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/WriteBuffer.java Wed Aug 15 
15:06:56 2018
@@ -0,0 +1,125 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.tomcat.util.net;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import org.apache.tomcat.util.buf.ByteBufferHolder;
+
+/**
+ * Provides an expandable set of buffers for writes. Non-blocking writes can be
+ * of any size and may not be able to be written immediately or wholly 
contained
+ * in the buffer used to perform the writes to the next layer. This class
+ * provides a buffering capability to allow such writes to return immediately
+ * and also allows for the user provided buffers to be re-used / recycled as
+ * required.
+ */
+public class WriteBuffer {
+
+    private final int bufferSize;
+
+    private final LinkedBlockingDeque<ByteBufferHolder> buffers = new 
LinkedBlockingDeque<>();
+
+    public WriteBuffer(int bufferSize) {
+        this.bufferSize = bufferSize;
+    }
+
+
+    void add(byte[] buf, int offset, int length) {
+        ByteBufferHolder holder = getByteBufferHolder(length);
+        holder.getBuf().put(buf, offset, length);
+    }
+
+
+    public void add(ByteBuffer from) {
+        ByteBufferHolder holder = getByteBufferHolder(from.remaining());
+        holder.getBuf().put(from);
+    }
+
+
+    private ByteBufferHolder getByteBufferHolder(int capacity) {
+        ByteBufferHolder holder = buffers.peekLast();
+        if (holder == null || holder.isFlipped() || 
holder.getBuf().remaining() < capacity) {
+            ByteBuffer buffer = ByteBuffer.allocate(Math.max(bufferSize, 
capacity));
+            holder = new ByteBufferHolder(buffer, false);
+            buffers.add(holder);
+        }
+        return holder;
+    }
+
+
+    public boolean isEmpty() {
+        return buffers.isEmpty();
+    }
+
+
+    ByteBuffer[] transferToListAsArray(List<ByteBuffer> target) {
+        for (ByteBufferHolder buffer : buffers) {
+            buffer.flip();
+            target.add(buffer.getBuf());
+        }
+        buffers.clear();
+        return target.toArray(new ByteBuffer[target.size()]);
+    }
+
+
+    boolean write(SocketWrapperBase<?> socketWrapper, boolean blocking) throws 
IOException {
+        Iterator<ByteBufferHolder> bufIter = buffers.iterator();
+        boolean dataLeft = false;
+        while (!dataLeft && bufIter.hasNext()) {
+            ByteBufferHolder buffer = bufIter.next();
+            buffer.flip();
+            if (blocking) {
+                socketWrapper.writeBlocking(buffer.getBuf());
+            } else {
+                dataLeft = 
socketWrapper.writeNonBlockingInternal(buffer.getBuf());
+            }
+            if (buffer.getBuf().remaining() == 0) {
+                bufIter.remove();
+            }
+        }
+        return dataLeft;
+    }
+
+
+    public boolean write(Sink sink, boolean blocking) throws IOException {
+        Iterator<ByteBufferHolder> bufIter = buffers.iterator();
+        boolean dataLeft = false;
+        while (!dataLeft && bufIter.hasNext()) {
+            ByteBufferHolder buffer = bufIter.next();
+            buffer.flip();
+            dataLeft = sink.writeFromBuffer(buffer.getBuf(), blocking);
+            if (!dataLeft) {
+                bufIter.remove();
+            }
+        }
+        return dataLeft;
+    }
+
+
+    /**
+     * Interface implemented by clients of the WriteBuffer to enable data to be
+     * written back out from the buffer.
+     */
+    public interface Sink {
+        boolean writeFromBuffer(ByteBuffer buffer, boolean block) throws 
IOException;
+    }
+}

Propchange: tomcat/trunk/java/org/apache/tomcat/util/net/WriteBuffer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: tomcat/trunk/webapps/docs/changelog.xml
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/webapps/docs/changelog.xml?rev=1838104&r1=1838103&r2=1838104&view=diff
==============================================================================
--- tomcat/trunk/webapps/docs/changelog.xml (original)
+++ tomcat/trunk/webapps/docs/changelog.xml Wed Aug 15 15:06:56 2018
@@ -51,6 +51,10 @@
         Fix potential deadlocks when using asynchronous Servlet processing with
         HTTP/2 connectors. (markt)
       </fix>
+      <fix>
+        <bug>62620</bug>: Fix corruption of response bodies when writing large
+        bodies using asynchronous processing over HTTP/2. (markt)
+      </fix>
     </changelog>
   </subsection>
   <subsection name="Other">



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to