Author: markt Date: Thu May 21 20:47:04 2015 New Revision: 1680952 URL: http://svn.apache.org/r1680952 Log: Implement flow control if the connection runs out of capacity. Needs some unit tests (once I figure out the best way to write them).
Modified: tomcat/trunk/java/org/apache/coyote/http2/AbstractStream.java tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java tomcat/trunk/java/org/apache/coyote/http2/Stream.java Modified: tomcat/trunk/java/org/apache/coyote/http2/AbstractStream.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/AbstractStream.java?rev=1680952&r1=1680951&r2=1680952&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http2/AbstractStream.java (original) +++ tomcat/trunk/java/org/apache/coyote/http2/AbstractStream.java Thu May 21 20:47:04 2015 @@ -17,25 +17,20 @@ package org.apache.coyote.http2; import java.util.HashSet; -import java.util.Iterator; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import org.apache.juli.logging.Log; -import org.apache.tomcat.util.res.StringManager; /** * Used to managed prioritisation. */ abstract class AbstractStream { - private static final StringManager sm = StringManager.getManager(AbstractStream.class); - private final Integer identifier; private volatile AbstractStream parentStream = null; private final Set<AbstractStream> childStreams = new HashSet<>(); - private volatile int weight = Constants.DEFAULT_WEIGHT; private AtomicLong windowSize = new AtomicLong(ConnectionSettings.DEFAULT_WINDOW_SIZE); public Integer getIdentifier() { @@ -48,34 +43,6 @@ abstract class AbstractStream { } - public void rePrioritise(AbstractStream parent, boolean exclusive, int weight) { - if (getLog().isDebugEnabled()) { - getLog().debug(sm.getString("abstractStream.reprioritisation.debug", - Long.toString(getConnectionId()), identifier, Boolean.toString(exclusive), - parent.getIdentifier(), Integer.toString(weight))); - } - - // Check if new parent is a descendant of this stream - if (isDescendant(parent)) { - parent.detachFromParent(); - parentStream.addChild(parent); - } - - if (exclusive) { - // Need to move children of the new parent to be children of this - // stream. Slightly convoluted to avoid concurrent modification. - Iterator<AbstractStream> parentsChildren = parent.getChildStreams().iterator(); - while (parentsChildren.hasNext()) { - AbstractStream parentsChild = parentsChildren.next(); - parentsChildren.remove(); - this.addChild(parentsChild); - } - } - parent.addChild(this); - this.weight = weight; - } - - void detachFromParent() { if (parentStream != null) { parentStream.getChildStreams().remove(this); @@ -138,17 +105,9 @@ abstract class AbstractStream { } - protected int reserveWindowSize(int reservation) { - long windowSize = this.windowSize.get(); - if (reservation > windowSize) { - return (int) windowSize; - } else { - return reservation; - } - } - - protected abstract Log getLog(); protected abstract int getConnectionId(); + + protected abstract int getWeight(); } Modified: tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java?rev=1680952&r1=1680951&r2=1680952&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java (original) +++ tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java Thu May 21 20:47:04 2015 @@ -21,9 +21,12 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; import java.util.Map; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import javax.servlet.http.WebConnection; @@ -101,7 +104,11 @@ public class Http2UpgradeHandler extends private final Map<Integer,Stream> streams = new HashMap<>(); - private final Queue<Object> writeQueue = new ConcurrentLinkedQueue<>(); + // Tracking for when the connection is blocked (windowSize < 1) + private final Object backLogLock = new Object(); + private final Map<AbstractStream,int[]> backLogStreams = new ConcurrentHashMap<>(); + private long backLogSize = 0; + public Http2UpgradeHandler(Adapter adapter) { super (STREAM_ID_ZERO); @@ -716,7 +723,6 @@ public class Http2UpgradeHandler extends stream.getIdentifier(), Integer.toString(data.remaining()))); } synchronized (socketWrapper) { - // TODO Manage window sizes byte[] header = new byte[9]; ByteUtil.setThreeBytes(header, 0, len); header[3] = FRAME_TYPE_DATA; @@ -737,28 +743,137 @@ public class Http2UpgradeHandler extends socketWrapper.registerWriteInterest(); return; } + } - Object obj; - while ((obj = getThingToWrite()) != null) { - // TODO - log.debug("TODO: write [" + obj.toString() + "]"); + + int reserveWindowSize(Stream stream, int toWrite) { + int result; + synchronized (backLogLock) { + long windowSize = getWindowSize(); + if (windowSize < 1 || backLogSize > 0) { + // Has this stream been granted an allocation + int[] value = backLogStreams.remove(stream); + if (value[1] > 0) { + result = value[1]; + value[0] = 0; + value[1] = 1; + } else { + value = new int[] { toWrite, 0 }; + backLogStreams.put(stream, value); + backLogSize += toWrite; + // Add the parents as well + AbstractStream parent = stream.getParentStream(); + while (parent != null && backLogStreams.putIfAbsent(parent, new int[2]) == null) { + parent = parent.getParentStream(); + } + result = 0; + } + } else if (windowSize < toWrite) { + result = (int) windowSize; + } else { + result = toWrite; + } + incrementWindowSize(-result); } + return result; } - private Object getThingToWrite() { - // TODO This is more complicated than pulling an object off a queue. - // Note: The checking of the queue for something to write and the - // calling of endWrite() if nothing is found must be kept - // within the same sync to avoid race conditions with adding - // entries to the queue. - return writeQueue.poll(); + @Override + protected void incrementWindowSize(int increment) { + synchronized (backLogLock) { + if (getWindowSize() == 0) { + releaseBackLog(increment); + } + super.incrementWindowSize(increment); + } + } + + + private void releaseBackLog(int increment) { + if (backLogSize < increment) { + // Can clear the whole backlog + for (AbstractStream stream : backLogStreams.keySet()) { + synchronized (stream) { + stream.notifyAll(); + } + } + backLogStreams.clear(); + backLogSize = 0; + } else { + int leftToAllocate = increment; + while (leftToAllocate > 0) { + leftToAllocate = allocate(this, leftToAllocate); + } + allocate(this, increment); + for (Entry<AbstractStream,int[]> entry : backLogStreams.entrySet()) { + int allocation = entry.getValue()[1]; + if (allocation > 0) { + backLogSize =- allocation; + synchronized (entry.getKey()) { + entry.getKey().notifyAll(); + } + } + } + } } - void addWrite(Object obj) { - writeQueue.add(obj); + private int allocate(AbstractStream stream, int allocation) { + // Allocate to the specified stream + int[] value = backLogStreams.get(stream); + if (value[0] >= allocation) { + value[0] -= allocation; + value[1] = allocation; + return 0; + } + + // There was some left over so allocate that to the children of the + // stream. + int leftToAllocate = allocation; + value[1] = value[0]; + value[0] = 0; + leftToAllocate -= value[1]; + + // Recipients are children of the current stream that are in the + // backlog. + Set<AbstractStream> recipients = new HashSet<>(); + recipients.addAll(stream.getChildStreams()); + recipients.retainAll(backLogStreams.keySet()); + + // Loop until we run out of allocation or recipients + while (leftToAllocate > 0) { + if (recipients.size() == 0) { + backLogStreams.remove(stream); + return leftToAllocate; + } + + int totalWeight = 0; + for (AbstractStream recipient : recipients) { + totalWeight += recipient.getWeight(); + } + + // Use an Iterator so fully allocated children/recipients can be + // removed. + Iterator<AbstractStream> iter = recipients.iterator(); + while (iter.hasNext()) { + AbstractStream recipient = iter.next(); + // +1 is to avoid rounding issues triggering an infinite loop. + // Will cause a very slight over allocation but HTTP/2 should + // cope with that. + int share = 1 + leftToAllocate * recipient.getWeight() / totalWeight; + int remainder = allocate(recipient, share); + // Remove recipients that receive their full allocation so that + // they are excluded from the next allocation round. + if (remainder > 0) { + iter.remove(); + } + leftToAllocate -= (share - remainder); + } + } + + return 0; } @@ -801,4 +916,10 @@ public class Http2UpgradeHandler extends protected final Log getLog() { return log; } + + + @Override + protected final int getWeight() { + return 0; + } } Modified: tomcat/trunk/java/org/apache/coyote/http2/Stream.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/Stream.java?rev=1680952&r1=1680951&r2=1680952&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http2/Stream.java (original) +++ tomcat/trunk/java/org/apache/coyote/http2/Stream.java Thu May 21 20:47:04 2015 @@ -18,6 +18,7 @@ package org.apache.coyote.http2; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Iterator; import org.apache.coyote.OutputBuffer; import org.apache.coyote.Request; @@ -33,6 +34,8 @@ public class Stream extends AbstractStre private static final Log log = LogFactory.getLog(Stream.class); private static final StringManager sm = StringManager.getManager(Stream.class); + private volatile int weight = Constants.DEFAULT_WEIGHT; + private final Http2UpgradeHandler handler; private final Request coyoteRequest = new Request(); private final Response coyoteResponse = new Response(); @@ -48,6 +51,34 @@ public class Stream extends AbstractStre } + public void rePrioritise(AbstractStream parent, boolean exclusive, int weight) { + if (getLog().isDebugEnabled()) { + getLog().debug(sm.getString("abstractStream.reprioritisation.debug", + Long.toString(getConnectionId()), getIdentifier(), Boolean.toString(exclusive), + parent.getIdentifier(), Integer.toString(weight))); + } + + // Check if new parent is a descendant of this stream + if (isDescendant(parent)) { + parent.detachFromParent(); + getParentStream().addChild(parent); + } + + if (exclusive) { + // Need to move children of the new parent to be children of this + // stream. Slightly convoluted to avoid concurrent modification. + Iterator<AbstractStream> parentsChildren = parent.getChildStreams().iterator(); + while (parentsChildren.hasNext()) { + AbstractStream parentsChild = parentsChildren.next(); + parentsChildren.remove(); + this.addChild(parentsChild); + } + } + parent.addChild(this); + this.weight = weight; + } + + @Override public void incrementWindowSize(int windowSizeIncrement) { // If this is zero then any thread that has been trying to write for @@ -64,6 +95,16 @@ public class Stream extends AbstractStre } + private int reserveWindowSize(int reservation) { + long windowSize = getWindowSize(); + if (reservation > windowSize) { + return (int) windowSize; + } else { + return reservation; + } + } + + @Override public void emitHeader(String name, String value, boolean neverIndex) { if (log.isDebugEnabled()) { @@ -142,6 +183,12 @@ public class Stream extends AbstractStre } + @Override + protected int getWeight() { + return weight; + } + + Request getCoyoteRequest() { return coyoteRequest; } @@ -201,12 +248,12 @@ public class Stream extends AbstractStre thisWriteStream = reserveWindowSize(left); if (thisWriteStream < 1) { // Need to block until a WindowUpdate message is - // processed for this stream; + // processed for this stream synchronized (this) { try { wait(); } catch (InterruptedException e) { - // TODO. Possible shutdown? + // TODO: Possible shutdown? } } } @@ -215,14 +262,21 @@ public class Stream extends AbstractStre // Flow control for the connection int thisWrite; do { - thisWrite = handler.reserveWindowSize(thisWriteStream); + thisWrite = handler.reserveWindowSize(Stream.this, thisWriteStream); if (thisWrite < 1) { - // TODO Flow control when connection window is exhausted + // Need to block until a WindowUpdate message is + // processed for this connection + synchronized (this) { + try { + wait(); + } catch (InterruptedException e) { + // TODO: Possible shutdown? + } + } } } while (thisWrite < 1); incrementWindowSize(-thisWrite); - handler.incrementWindowSize(-thisWrite); // Do the write handler.writeBody(Stream.this, buffer, thisWrite); --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org