Author: markt
Date: Thu Jun 25 14:12:08 2015
New Revision: 1687527

URL: http://svn.apache.org/r1687527
Log:
Refactor the backlog/window size syncs after FindBugs highlighted a couple of 
timing issues.

Modified:
    tomcat/trunk/java/org/apache/coyote/http2/AbstractStream.java
    tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java
    tomcat/trunk/java/org/apache/coyote/http2/Stream.java

Modified: tomcat/trunk/java/org/apache/coyote/http2/AbstractStream.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/AbstractStream.java?rev=1687527&r1=1687526&r2=1687527&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http2/AbstractStream.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http2/AbstractStream.java Thu Jun 25 
14:12:08 2015
@@ -36,7 +36,6 @@ abstract class AbstractStream {
     private volatile AbstractStream parentStream = null;
     private final Set<AbstractStream> childStreams = new HashSet<>();
     private long windowSize = ConnectionSettings.DEFAULT_WINDOW_SIZE;
-    private final Object windowSizeLock = new Object();
 
     public Integer getIdentifier() {
         return identifier;
@@ -95,17 +94,13 @@ abstract class AbstractStream {
     }
 
 
-    protected void setWindowSize(long windowSize) {
-        synchronized (windowSizeLock) {
-            this.windowSize = windowSize;
-        }
+    protected synchronized void setWindowSize(long windowSize) {
+        this.windowSize = windowSize;
     }
 
 
-    protected long getWindowSize() {
-        synchronized (windowSizeLock) {
-            return windowSize;
-        }
+    protected synchronized long getWindowSize() {
+        return windowSize;
     }
 
 
@@ -113,32 +108,28 @@ abstract class AbstractStream {
      * @param increment
      * @throws Http2Exception
      */
-    protected void incrementWindowSize(int increment) throws Http2Exception {
-        synchronized (windowSizeLock) {
-            // Overflow protection
-            if (Long.MAX_VALUE - increment < windowSize) {
-                windowSize = Long.MAX_VALUE;
-            } else {
-                windowSize += increment;
-            }
-            if (log.isDebugEnabled()) {
-                log.debug(sm.getString("abstractStream.windowSizeInc", 
getConnectionId(),
-                        getIdentifier(), Integer.toString(increment), 
Long.toString(windowSize)));
-            }
+    protected synchronized void incrementWindowSize(int increment) throws 
Http2Exception {
+        // Overflow protection
+        if (Long.MAX_VALUE - increment < windowSize) {
+            windowSize = Long.MAX_VALUE;
+        } else {
+            windowSize += increment;
+        }
+        if (log.isDebugEnabled()) {
+            log.debug(sm.getString("abstractStream.windowSizeInc", 
getConnectionId(),
+                    getIdentifier(), Integer.toString(increment), 
Long.toString(windowSize)));
         }
     }
 
 
-    protected void decrementWindowSize(int decrement) {
+    protected synchronized void decrementWindowSize(int decrement) {
         // No need for overflow protection here. Decrement can never be larger
         // the Integer.MAX_VALUE and once windowSize goes negative no further
         // decrements are permitted
-        synchronized (windowSizeLock) {
-            windowSize -= decrement;
-            if (log.isDebugEnabled()) {
-                log.debug(sm.getString("abstractStream.windowSizeDec", 
getConnectionId(),
-                        getIdentifier(), Integer.toString(decrement), 
Long.toString(windowSize)));
-            }
+        windowSize -= decrement;
+        if (log.isDebugEnabled()) {
+            log.debug(sm.getString("abstractStream.windowSizeDec", 
getConnectionId(),
+                    getIdentifier(), Integer.toString(decrement), 
Long.toString(windowSize)));
         }
     }
 

Modified: tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java?rev=1687527&r1=1687526&r2=1687527&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java 
(original)
+++ tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java Thu Jun 
25 14:12:08 2015
@@ -124,7 +124,6 @@ public class Http2UpgradeHandler extends
     private volatile int maxProcessedStreamId;
 
     // Tracking for when the connection is blocked (windowSize < 1)
-    private final Object backLogLock = new Object();
     private final Map<AbstractStream,int[]> backLogStreams = new 
ConcurrentHashMap<>();
     private long backLogSize = 0;
 
@@ -486,51 +485,64 @@ public class Http2UpgradeHandler extends
     }
 
 
-    int reserveWindowSize(Stream stream, int toWrite) {
-        int result;
-        synchronized (backLogLock) {
-            long windowSize = getWindowSize();
-            if (windowSize < 1 || backLogSize > 0) {
-                // Has this stream been granted an allocation
-                int[] value = backLogStreams.remove(stream);
-                if (value != null && value[1] > 0) {
-                    result = value[1];
-                } else {
-                    value = new int[] { toWrite, 0 };
-                    backLogStreams.put(stream, value);
-                    backLogSize += toWrite;
-                    // Add the parents as well
-                    AbstractStream parent = stream.getParentStream();
-                    while (parent != null && 
backLogStreams.putIfAbsent(parent, new int[2]) == null) {
-                        parent = parent.getParentStream();
+    int reserveWindowSize(Stream stream, int reservation) {
+        // Need to be holding the stream lock so releaseBacklog() can't notify
+        // this thread until after this thread enters wait()
+        int allocation = 0;
+        synchronized (stream) {
+            do {
+                synchronized (this) {
+                    long windowSize = getWindowSize();
+                    if (windowSize < 1 || backLogSize > 0) {
+                        // Has this stream been granted an allocation
+                        int[] value = backLogStreams.remove(stream);
+                        if (value != null && value[1] > 0) {
+                            allocation = value[1];
+                            decrementWindowSize(allocation);
+                        } else {
+                            value = new int[] { reservation, 0 };
+                            backLogStreams.put(stream, value);
+                            backLogSize += reservation;
+                            // Add the parents as well
+                            AbstractStream parent = stream.getParentStream();
+                            while (parent != null && 
backLogStreams.putIfAbsent(parent, new int[2]) == null) {
+                                parent = parent.getParentStream();
+                            }
+                        }
+                    } else if (windowSize < reservation) {
+                        allocation = (int) windowSize;
+                        decrementWindowSize(allocation);
+                    } else {
+                        allocation = reservation;
+                        decrementWindowSize(allocation);
                     }
-                    result = 0;
                 }
-            } else if (windowSize < toWrite) {
-                result = (int) windowSize;
-            } else {
-                result = toWrite;
-            }
-            decrementWindowSize(result);
+                if (allocation == 0) {
+                    try {
+                        stream.wait();
+                    } catch (InterruptedException e) {
+                        // TODO Auto-generated catch block
+                        e.printStackTrace();
+                    }
+                }
+            } while (allocation == 0);
         }
-        return result;
+        return allocation;
     }
 
 
 
     @Override
-    protected void incrementWindowSize(int increment) throws Http2Exception {
-        synchronized (backLogLock) {
-            long windowSize = getWindowSize();
-            if (windowSize < 1 && windowSize + increment > 0) {
-                releaseBackLog(increment);
-            }
-            super.incrementWindowSize(increment);
+    protected synchronized void incrementWindowSize(int increment) throws 
Http2Exception {
+        long windowSize = getWindowSize();
+        if (windowSize < 1 && windowSize + increment > 0) {
+            releaseBackLog(increment);
         }
+        super.incrementWindowSize(increment);
     }
 
 
-    private void releaseBackLog(int increment) {
+    private synchronized void releaseBackLog(int increment) {
         if (backLogSize < increment) {
             // Can clear the whole backlog
             for (AbstractStream stream : backLogStreams.keySet()) {

Modified: tomcat/trunk/java/org/apache/coyote/http2/Stream.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/Stream.java?rev=1687527&r1=1687526&r2=1687527&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http2/Stream.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http2/Stream.java Thu Jun 25 14:12:08 
2015
@@ -119,28 +119,37 @@ public class Stream extends AbstractStre
 
 
     @Override
-    public void incrementWindowSize(int windowSizeIncrement) throws 
Http2Exception {
+    public synchronized void incrementWindowSize(int windowSizeIncrement) 
throws Http2Exception {
         // If this is zero then any thread that has been trying to write for
         // this stream will be waiting. Notify that thread it can continue. Use
         // notify all even though only one thread is waiting to be on the safe
         // side.
-        boolean notify = getWindowSize() == 0;
+        boolean notify = getWindowSize() < 1;
         super.incrementWindowSize(windowSizeIncrement);
-        if (notify) {
-            synchronized (this) {
-                notifyAll();
-            }
+        if (notify && getWindowSize() > 0) {
+            notifyAll();
         }
     }
 
 
-    private int checkWindowSize(int reservation) {
+    private synchronized int reserveWindowSize(int reservation) {
         long windowSize = getWindowSize();
-        if (reservation > windowSize) {
-            return (int) windowSize;
+        while (windowSize < 1) {
+            try {
+                wait();
+            } catch (InterruptedException e) {
+                // TODO Auto-generated catch block
+            }
+            windowSize = getWindowSize();
+        }
+        int allocation;
+        if (windowSize < reservation) {
+            allocation = (int) windowSize;
         } else {
-            return reservation;
+            allocation = reservation;
         }
+        decrementWindowSize(allocation);
+        return allocation;
     }
 
 
@@ -277,8 +286,13 @@ public class Stream extends AbstractStre
         private volatile long written = 0;
         private volatile boolean closed = false;
 
+        /* The write methods are synchronized to ensure that only one thread at
+         * a time is able to access the buffer. Without this protection, a
+         * client that performed concurrent writes could corrupt the buffer.
+         */
+
         @Override
-        public int doWrite(ByteChunk chunk) throws IOException {
+        public synchronized int doWrite(ByteChunk chunk) throws IOException {
             if (closed) {
                 // TODO i18n
                 throw new IllegalStateException();
@@ -300,11 +314,11 @@ public class Stream extends AbstractStre
             return offset;
         }
 
-        public void flush() throws IOException {
+        public synchronized void flush() throws IOException {
             flush(false);
         }
 
-        private void flush(boolean writeInProgress) throws IOException {
+        private synchronized void flush(boolean writeInProgress) throws 
IOException {
             if (!coyoteResponse.isCommitted()) {
                 coyoteResponse.sendHeaders();
             }
@@ -314,51 +328,19 @@ public class Stream extends AbstractStre
             }
             buffer.flip();
             int left = buffer.remaining();
-            int thisWriteStream;
             while (left > 0) {
-                // Flow control for the Stream
-                do {
-                    thisWriteStream = checkWindowSize(left);
-                    if (thisWriteStream < 1) {
-                        // Need to block until a WindowUpdate message is
-                        // processed for this stream
-                        synchronized (Stream.this) {
-                            try {
-                                Stream.this.wait();
-                            } catch (InterruptedException e) {
-                                // TODO: Possible shutdown?
-                            }
-                        }
-                    }
-                } while (thisWriteStream < 1);
-
-                // Flow control for the connection
-                int thisWrite;
-                do {
-                    thisWrite = handler.reserveWindowSize(Stream.this, 
thisWriteStream);
-                    if (thisWrite < 1) {
-                        // Need to block until a WindowUpdate message is
-                        // processed for this connection
-                        synchronized (Stream.this) {
-                            try {
-                                Stream.this.wait();
-                            } catch (InterruptedException e) {
-                                // TODO: Possible shutdown?
-                            }
-                        }
-                    }
-                } while (thisWrite < 1);
-
-                // Stream.checkWindowSize() doesn't reduce the flow control
-                // window (reserveWindowSize() does) so the Stream's window
-                // needs to be reduced here.
-                decrementWindowSize(thisWrite);
-
-                // Do the write
-                handler.writeBody(Stream.this, buffer, thisWrite,
-                        !writeInProgress && closed && left == thisWrite);
-                left -= thisWrite;
-                buffer.position(buffer.position() + thisWrite);
+                int streamReservation  = reserveWindowSize(left);
+                while (streamReservation > 0) {
+                    int connectionReservation =
+                                handler.reserveWindowSize(Stream.this, 
streamReservation);
+                    // Do the write
+                    handler.writeBody(Stream.this, buffer, 
connectionReservation,
+                            !writeInProgress && closed && left == 
connectionReservation);
+                    streamReservation -= connectionReservation;
+                    left -= connectionReservation;
+                    buffer.position(buffer.position() + connectionReservation);
+
+                }
             }
             buffer.clear();
         }



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to