Author: markt Date: Thu Jun 25 14:12:08 2015 New Revision: 1687527 URL: http://svn.apache.org/r1687527 Log: Refactor the backlog/window size syncs after FindBugs highlighted a couple of timing issues.
Modified: tomcat/trunk/java/org/apache/coyote/http2/AbstractStream.java tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java tomcat/trunk/java/org/apache/coyote/http2/Stream.java Modified: tomcat/trunk/java/org/apache/coyote/http2/AbstractStream.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/AbstractStream.java?rev=1687527&r1=1687526&r2=1687527&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http2/AbstractStream.java (original) +++ tomcat/trunk/java/org/apache/coyote/http2/AbstractStream.java Thu Jun 25 14:12:08 2015 @@ -36,7 +36,6 @@ abstract class AbstractStream { private volatile AbstractStream parentStream = null; private final Set<AbstractStream> childStreams = new HashSet<>(); private long windowSize = ConnectionSettings.DEFAULT_WINDOW_SIZE; - private final Object windowSizeLock = new Object(); public Integer getIdentifier() { return identifier; @@ -95,17 +94,13 @@ abstract class AbstractStream { } - protected void setWindowSize(long windowSize) { - synchronized (windowSizeLock) { - this.windowSize = windowSize; - } + protected synchronized void setWindowSize(long windowSize) { + this.windowSize = windowSize; } - protected long getWindowSize() { - synchronized (windowSizeLock) { - return windowSize; - } + protected synchronized long getWindowSize() { + return windowSize; } @@ -113,32 +108,28 @@ abstract class AbstractStream { * @param increment * @throws Http2Exception */ - protected void incrementWindowSize(int increment) throws Http2Exception { - synchronized (windowSizeLock) { - // Overflow protection - if (Long.MAX_VALUE - increment < windowSize) { - windowSize = Long.MAX_VALUE; - } else { - windowSize += increment; - } - if (log.isDebugEnabled()) { - log.debug(sm.getString("abstractStream.windowSizeInc", getConnectionId(), - getIdentifier(), Integer.toString(increment), Long.toString(windowSize))); - } + protected synchronized void incrementWindowSize(int increment) throws Http2Exception { + // Overflow protection + if (Long.MAX_VALUE - increment < windowSize) { + windowSize = Long.MAX_VALUE; + } else { + windowSize += increment; + } + if (log.isDebugEnabled()) { + log.debug(sm.getString("abstractStream.windowSizeInc", getConnectionId(), + getIdentifier(), Integer.toString(increment), Long.toString(windowSize))); } } - protected void decrementWindowSize(int decrement) { + protected synchronized void decrementWindowSize(int decrement) { // No need for overflow protection here. Decrement can never be larger // the Integer.MAX_VALUE and once windowSize goes negative no further // decrements are permitted - synchronized (windowSizeLock) { - windowSize -= decrement; - if (log.isDebugEnabled()) { - log.debug(sm.getString("abstractStream.windowSizeDec", getConnectionId(), - getIdentifier(), Integer.toString(decrement), Long.toString(windowSize))); - } + windowSize -= decrement; + if (log.isDebugEnabled()) { + log.debug(sm.getString("abstractStream.windowSizeDec", getConnectionId(), + getIdentifier(), Integer.toString(decrement), Long.toString(windowSize))); } } Modified: tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java?rev=1687527&r1=1687526&r2=1687527&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java (original) +++ tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java Thu Jun 25 14:12:08 2015 @@ -124,7 +124,6 @@ public class Http2UpgradeHandler extends private volatile int maxProcessedStreamId; // Tracking for when the connection is blocked (windowSize < 1) - private final Object backLogLock = new Object(); private final Map<AbstractStream,int[]> backLogStreams = new ConcurrentHashMap<>(); private long backLogSize = 0; @@ -486,51 +485,64 @@ public class Http2UpgradeHandler extends } - int reserveWindowSize(Stream stream, int toWrite) { - int result; - synchronized (backLogLock) { - long windowSize = getWindowSize(); - if (windowSize < 1 || backLogSize > 0) { - // Has this stream been granted an allocation - int[] value = backLogStreams.remove(stream); - if (value != null && value[1] > 0) { - result = value[1]; - } else { - value = new int[] { toWrite, 0 }; - backLogStreams.put(stream, value); - backLogSize += toWrite; - // Add the parents as well - AbstractStream parent = stream.getParentStream(); - while (parent != null && backLogStreams.putIfAbsent(parent, new int[2]) == null) { - parent = parent.getParentStream(); + int reserveWindowSize(Stream stream, int reservation) { + // Need to be holding the stream lock so releaseBacklog() can't notify + // this thread until after this thread enters wait() + int allocation = 0; + synchronized (stream) { + do { + synchronized (this) { + long windowSize = getWindowSize(); + if (windowSize < 1 || backLogSize > 0) { + // Has this stream been granted an allocation + int[] value = backLogStreams.remove(stream); + if (value != null && value[1] > 0) { + allocation = value[1]; + decrementWindowSize(allocation); + } else { + value = new int[] { reservation, 0 }; + backLogStreams.put(stream, value); + backLogSize += reservation; + // Add the parents as well + AbstractStream parent = stream.getParentStream(); + while (parent != null && backLogStreams.putIfAbsent(parent, new int[2]) == null) { + parent = parent.getParentStream(); + } + } + } else if (windowSize < reservation) { + allocation = (int) windowSize; + decrementWindowSize(allocation); + } else { + allocation = reservation; + decrementWindowSize(allocation); } - result = 0; } - } else if (windowSize < toWrite) { - result = (int) windowSize; - } else { - result = toWrite; - } - decrementWindowSize(result); + if (allocation == 0) { + try { + stream.wait(); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + } while (allocation == 0); } - return result; + return allocation; } @Override - protected void incrementWindowSize(int increment) throws Http2Exception { - synchronized (backLogLock) { - long windowSize = getWindowSize(); - if (windowSize < 1 && windowSize + increment > 0) { - releaseBackLog(increment); - } - super.incrementWindowSize(increment); + protected synchronized void incrementWindowSize(int increment) throws Http2Exception { + long windowSize = getWindowSize(); + if (windowSize < 1 && windowSize + increment > 0) { + releaseBackLog(increment); } + super.incrementWindowSize(increment); } - private void releaseBackLog(int increment) { + private synchronized void releaseBackLog(int increment) { if (backLogSize < increment) { // Can clear the whole backlog for (AbstractStream stream : backLogStreams.keySet()) { Modified: tomcat/trunk/java/org/apache/coyote/http2/Stream.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/Stream.java?rev=1687527&r1=1687526&r2=1687527&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http2/Stream.java (original) +++ tomcat/trunk/java/org/apache/coyote/http2/Stream.java Thu Jun 25 14:12:08 2015 @@ -119,28 +119,37 @@ public class Stream extends AbstractStre @Override - public void incrementWindowSize(int windowSizeIncrement) throws Http2Exception { + public synchronized void incrementWindowSize(int windowSizeIncrement) throws Http2Exception { // If this is zero then any thread that has been trying to write for // this stream will be waiting. Notify that thread it can continue. Use // notify all even though only one thread is waiting to be on the safe // side. - boolean notify = getWindowSize() == 0; + boolean notify = getWindowSize() < 1; super.incrementWindowSize(windowSizeIncrement); - if (notify) { - synchronized (this) { - notifyAll(); - } + if (notify && getWindowSize() > 0) { + notifyAll(); } } - private int checkWindowSize(int reservation) { + private synchronized int reserveWindowSize(int reservation) { long windowSize = getWindowSize(); - if (reservation > windowSize) { - return (int) windowSize; + while (windowSize < 1) { + try { + wait(); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + } + windowSize = getWindowSize(); + } + int allocation; + if (windowSize < reservation) { + allocation = (int) windowSize; } else { - return reservation; + allocation = reservation; } + decrementWindowSize(allocation); + return allocation; } @@ -277,8 +286,13 @@ public class Stream extends AbstractStre private volatile long written = 0; private volatile boolean closed = false; + /* The write methods are synchronized to ensure that only one thread at + * a time is able to access the buffer. Without this protection, a + * client that performed concurrent writes could corrupt the buffer. + */ + @Override - public int doWrite(ByteChunk chunk) throws IOException { + public synchronized int doWrite(ByteChunk chunk) throws IOException { if (closed) { // TODO i18n throw new IllegalStateException(); @@ -300,11 +314,11 @@ public class Stream extends AbstractStre return offset; } - public void flush() throws IOException { + public synchronized void flush() throws IOException { flush(false); } - private void flush(boolean writeInProgress) throws IOException { + private synchronized void flush(boolean writeInProgress) throws IOException { if (!coyoteResponse.isCommitted()) { coyoteResponse.sendHeaders(); } @@ -314,51 +328,19 @@ public class Stream extends AbstractStre } buffer.flip(); int left = buffer.remaining(); - int thisWriteStream; while (left > 0) { - // Flow control for the Stream - do { - thisWriteStream = checkWindowSize(left); - if (thisWriteStream < 1) { - // Need to block until a WindowUpdate message is - // processed for this stream - synchronized (Stream.this) { - try { - Stream.this.wait(); - } catch (InterruptedException e) { - // TODO: Possible shutdown? - } - } - } - } while (thisWriteStream < 1); - - // Flow control for the connection - int thisWrite; - do { - thisWrite = handler.reserveWindowSize(Stream.this, thisWriteStream); - if (thisWrite < 1) { - // Need to block until a WindowUpdate message is - // processed for this connection - synchronized (Stream.this) { - try { - Stream.this.wait(); - } catch (InterruptedException e) { - // TODO: Possible shutdown? - } - } - } - } while (thisWrite < 1); - - // Stream.checkWindowSize() doesn't reduce the flow control - // window (reserveWindowSize() does) so the Stream's window - // needs to be reduced here. - decrementWindowSize(thisWrite); - - // Do the write - handler.writeBody(Stream.this, buffer, thisWrite, - !writeInProgress && closed && left == thisWrite); - left -= thisWrite; - buffer.position(buffer.position() + thisWrite); + int streamReservation = reserveWindowSize(left); + while (streamReservation > 0) { + int connectionReservation = + handler.reserveWindowSize(Stream.this, streamReservation); + // Do the write + handler.writeBody(Stream.this, buffer, connectionReservation, + !writeInProgress && closed && left == connectionReservation); + streamReservation -= connectionReservation; + left -= connectionReservation; + buffer.position(buffer.position() + connectionReservation); + + } } buffer.clear(); } --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org