Author: markt Date: Thu May 21 15:31:43 2015 New Revision: 1680910 URL: http://svn.apache.org/r1680910 Log: Re-thinking flow control / write coordination WriteStateMachine no longer required (but it can be brought back from svn history if I change my mind again). Connection level flow control (trickier than stream level) is still TODO
Removed: tomcat/trunk/java/org/apache/coyote/http2/WriteStateMachine.java Modified: tomcat/trunk/java/org/apache/coyote/http2/AbstractStream.java tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java tomcat/trunk/java/org/apache/coyote/http2/Stream.java Modified: tomcat/trunk/java/org/apache/coyote/http2/AbstractStream.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/AbstractStream.java?rev=1680910&r1=1680909&r2=1680910&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http2/AbstractStream.java (original) +++ tomcat/trunk/java/org/apache/coyote/http2/AbstractStream.java Thu May 21 15:31:43 2015 @@ -35,6 +35,7 @@ abstract class AbstractStream { private volatile AbstractStream parentStream = null; private final Set<AbstractStream> childStreams = new HashSet<>(); private volatile int weight = Constants.DEFAULT_WEIGHT; + private volatile long windowSize = ConnectionSettings.DEFAULT_WINDOW_SIZE; public Integer getIdentifier() { return identifier; @@ -120,6 +121,35 @@ abstract class AbstractStream { return childStreams; } + + protected void setWindowSize(long windowSize) { + this.windowSize = windowSize; + } + + + protected long getWindowSize() { + return windowSize; + } + + + protected void incrementWindowSize(int increment) { + windowSize += increment; + } + + + protected void decrementWindowSize(int decrement) { + windowSize += decrement; + } + + protected int reserveWindowSize(int reservation) { + if (reservation > windowSize) { + return (int) windowSize; + } else { + return reservation; + } + } + + protected abstract Log getLog(); protected abstract int getConnectionId(); Modified: tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java?rev=1680910&r1=1680909&r2=1680910&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java (original) +++ tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java Thu May 21 15:31:43 2015 @@ -32,7 +32,6 @@ import org.apache.coyote.Adapter; import org.apache.coyote.Response; import org.apache.coyote.http11.upgrade.InternalHttpUpgradeHandler; import org.apache.coyote.http2.HpackEncoder.State; -import org.apache.coyote.http2.WriteStateMachine.WriteState; import org.apache.juli.logging.Log; import org.apache.juli.logging.LogFactory; import org.apache.tomcat.util.http.MimeHeaders; @@ -94,7 +93,6 @@ public class Http2UpgradeHandler extends private final ConnectionSettings remoteSettings = new ConnectionSettings(); private final ConnectionSettings localSettings = new ConnectionSettings(); - private volatile long flowControlWindowSize = ConnectionSettings.DEFAULT_WINDOW_SIZE; private volatile int maxRemoteStreamId = 0; private HpackDecoder hpackDecoder; @@ -103,7 +101,6 @@ public class Http2UpgradeHandler extends private final Map<Integer,Stream> streams = new HashMap<>(); - private final WriteStateMachine writeStateMachine = new WriteStateMachine(); private final Queue<Object> writeQueue = new ConcurrentLinkedQueue<>(); public Http2UpgradeHandler(Adapter adapter) { @@ -159,7 +156,6 @@ public class Http2UpgradeHandler extends switch(status) { case OPEN_READ: - writeStateMachine.startRead(); // Gets set to null once the connection preface has been // successfully parsed. if (connectionPrefaceParser != null) { @@ -181,13 +177,6 @@ public class Http2UpgradeHandler extends try { while (processFrame()) { } - - // We are on a container thread. There is no more data to read - // so check for writes (more efficient than dispatching to a new - // thread). - if (writeStateMachine.endRead()) { - processWrites(); - } } catch (Http2Exception h2e) { if (h2e.getStreamId() == 0) { // Connection error @@ -212,30 +201,28 @@ public class Http2UpgradeHandler extends break; case OPEN_WRITE: - if (writeStateMachine.startWrite()) { - try { - processWrites(); - } catch (Http2Exception h2e) { - if (h2e.getStreamId() == 0) { - // Connection error - log.warn(sm.getString("upgradeHandler.connectionError"), h2e); - close(h2e); - break; - } else { - // Stream error - // TODO Reset stream - } - } catch (IOException ioe) { - if (log.isDebugEnabled()) { - log.debug(sm.getString("upgradeHandler.ioerror", - Long.toString(connectionId)), ioe); - } - close(); - result = SocketState.CLOSED; + try { + processWrites(); + } catch (Http2Exception h2e) { + if (h2e.getStreamId() == 0) { + // Connection error + log.warn(sm.getString("upgradeHandler.connectionError"), h2e); + close(h2e); break; + } else { + // Stream error + // TODO Reset stream } - + } catch (IOException ioe) { + if (log.isDebugEnabled()) { + log.debug(sm.getString("upgradeHandler.ioerror", + Long.toString(connectionId)), ioe); + } + close(); + result = SocketState.CLOSED; + break; } + result = SocketState.UPGRADED; break; @@ -539,7 +526,7 @@ public class Http2UpgradeHandler extends streamId, Http2Exception.PROTOCOL_ERROR); } if (streamId == 0) { - flowControlWindowSize += windowSizeIncrement; + incrementWindowSize(windowSizeIncrement); } else { Stream stream = getStream(streamId); if (stream == null) { @@ -723,8 +710,7 @@ public class Http2UpgradeHandler extends } - void writeBody(Stream stream, ByteBuffer data) throws IOException { - data.flip(); + void writeBody(Stream stream, ByteBuffer data, int len) throws IOException { if (log.isDebugEnabled()) { log.debug(sm.getString("upgradeHandler.writeBody", Integer.toString(connectionId), stream.getIdentifier(), Integer.toString(data.remaining()))); @@ -732,14 +718,15 @@ public class Http2UpgradeHandler extends synchronized (socketWrapper) { // TODO Manage window sizes byte[] header = new byte[9]; - ByteUtil.setThreeBytes(header, 0, data.remaining()); + ByteUtil.setThreeBytes(header, 0, len); header[3] = FRAME_TYPE_DATA; if (stream.getOutputBuffer().isFinished()) { header[4] = FLAG_END_OF_STREAM; } ByteUtil.set31Bits(header, 5, stream.getIdentifier().intValue()); socketWrapper.write(true, header, 0, header.length); - socketWrapper.write(true, data.array(), data.arrayOffset(), data.limit()); + socketWrapper.write(true, data.array(), data.arrayOffset() + data.position(), + len); socketWrapper.flush(true); } } @@ -760,31 +747,18 @@ public class Http2UpgradeHandler extends private Object getThingToWrite() { - synchronized (writeStateMachine) { - // TODO This is more complicated than pulling an object off a queue. + // TODO This is more complicated than pulling an object off a queue. - // Note: The checking of the queue for something to write and the - // calling of endWrite() if nothing is found must be kept - // within the same sync to avoid race conditions with adding - // entries to the queue. - Object obj = writeQueue.poll(); - if (obj == null) { - writeStateMachine.endWrite(WriteState.IDLE); - } - return obj; - } + // Note: The checking of the queue for something to write and the + // calling of endWrite() if nothing is found must be kept + // within the same sync to avoid race conditions with adding + // entries to the queue. + return writeQueue.poll(); } void addWrite(Object obj) { - boolean needDispatch; - synchronized (writeStateMachine) { - writeQueue.add(obj); - needDispatch = writeStateMachine.addWrite(); - } - if (needDispatch) { - socketWrapper.processSocket(SocketStatus.OPEN_WRITE, true); - } + writeQueue.add(obj); } Modified: tomcat/trunk/java/org/apache/coyote/http2/Stream.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/Stream.java?rev=1680910&r1=1680909&r2=1680910&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http2/Stream.java (original) +++ tomcat/trunk/java/org/apache/coyote/http2/Stream.java Thu May 21 15:31:43 2015 @@ -38,20 +38,29 @@ public class Stream extends AbstractStre private final Response coyoteResponse = new Response(); private final StreamOutputBuffer outputBuffer = new StreamOutputBuffer(); - private volatile long flowControlWindowSize; - public Stream(Integer identifier, Http2UpgradeHandler handler) { super(identifier); this.handler = handler; setParentStream(handler); - flowControlWindowSize = handler.getRemoteSettings().getInitialWindowSize(); + setWindowSize(handler.getRemoteSettings().getInitialWindowSize()); coyoteResponse.setOutputBuffer(outputBuffer); } + @Override public void incrementWindowSize(int windowSizeIncrement) { - flowControlWindowSize += windowSizeIncrement; + // If this is zero then any thread that has been trying to write for + // this stream will be waiting. Notify that thread it can continue. Use + // notify all even though only one thread is waiting to be on the safe + // side. + boolean notify = getWindowSize() == 0; + super.incrementWindowSize(windowSizeIncrement); + if (notify) { + synchronized (this) { + notifyAll(); + } + } } @@ -176,11 +185,50 @@ public class Stream extends AbstractStre } public void flush() throws IOException { + if (!coyoteResponse.isCommitted()) { + coyoteResponse.sendHeaders(); + } if (buffer.position() == 0) { // Buffer is empty. Nothing to do. return; } - handler.writeBody(Stream.this, buffer); + buffer.flip(); + int left = buffer.remaining(); + int thisWriteStream; + while (left > 0) { + // Flow control for the Stream + do { + thisWriteStream = reserveWindowSize(left); + if (thisWriteStream < 1) { + // Need to block until a WindowUpdate message is + // processed for this stream; + synchronized (this) { + try { + wait(); + } catch (InterruptedException e) { + // TODO. Possible shutdown? + } + } + } + } while (thisWriteStream < 1); + + // Flow control for the connection + int thisWrite; + do { + thisWrite = handler.reserveWindowSize(thisWriteStream); + if (thisWrite < 1) { + // TODO Flow control when connection window is exhausted + } + } while (thisWrite < 1); + + decrementWindowSize(thisWrite); + handler.decrementWindowSize(thisWrite); + + // Do the write + handler.writeBody(Stream.this, buffer, thisWrite); + left -= thisWrite; + buffer.position(buffer.position() + thisWrite); + } buffer.clear(); } --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org