Author: markt
Date: Thu May 21 15:31:43 2015
New Revision: 1680910

URL: http://svn.apache.org/r1680910
Log:
Re-thinking flow control / write coordination
WriteStateMachine no longer required (but it can be brought back from svn 
history if I change my mind again).
Connection level flow control (trickier than stream level) is still TODO

Removed:
    tomcat/trunk/java/org/apache/coyote/http2/WriteStateMachine.java
Modified:
    tomcat/trunk/java/org/apache/coyote/http2/AbstractStream.java
    tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java
    tomcat/trunk/java/org/apache/coyote/http2/Stream.java

Modified: tomcat/trunk/java/org/apache/coyote/http2/AbstractStream.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/AbstractStream.java?rev=1680910&r1=1680909&r2=1680910&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http2/AbstractStream.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http2/AbstractStream.java Thu May 21 
15:31:43 2015
@@ -35,6 +35,7 @@ abstract class AbstractStream {
     private volatile AbstractStream parentStream = null;
     private final Set<AbstractStream> childStreams = new HashSet<>();
     private volatile int weight = Constants.DEFAULT_WEIGHT;
+    private volatile long windowSize = ConnectionSettings.DEFAULT_WINDOW_SIZE;
 
     public Integer getIdentifier() {
         return identifier;
@@ -120,6 +121,35 @@ abstract class AbstractStream {
         return childStreams;
     }
 
+
+    protected void setWindowSize(long windowSize) {
+        this.windowSize = windowSize;
+    }
+
+
+    protected long getWindowSize() {
+        return windowSize;
+    }
+
+
+    protected void incrementWindowSize(int increment) {
+        windowSize += increment;
+    }
+
+
+    protected void decrementWindowSize(int decrement) {
+        windowSize += decrement;
+    }
+
+    protected int reserveWindowSize(int reservation) {
+        if (reservation > windowSize) {
+            return (int) windowSize;
+        } else {
+            return reservation;
+        }
+    }
+
+
     protected abstract Log getLog();
 
     protected abstract int getConnectionId();

Modified: tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java?rev=1680910&r1=1680909&r2=1680910&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java 
(original)
+++ tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java Thu May 
21 15:31:43 2015
@@ -32,7 +32,6 @@ import org.apache.coyote.Adapter;
 import org.apache.coyote.Response;
 import org.apache.coyote.http11.upgrade.InternalHttpUpgradeHandler;
 import org.apache.coyote.http2.HpackEncoder.State;
-import org.apache.coyote.http2.WriteStateMachine.WriteState;
 import org.apache.juli.logging.Log;
 import org.apache.juli.logging.LogFactory;
 import org.apache.tomcat.util.http.MimeHeaders;
@@ -94,7 +93,6 @@ public class Http2UpgradeHandler extends
 
     private final ConnectionSettings remoteSettings = new ConnectionSettings();
     private final ConnectionSettings localSettings = new ConnectionSettings();
-    private volatile long flowControlWindowSize = 
ConnectionSettings.DEFAULT_WINDOW_SIZE;
     private volatile int maxRemoteStreamId = 0;
 
     private HpackDecoder hpackDecoder;
@@ -103,7 +101,6 @@ public class Http2UpgradeHandler extends
 
     private final Map<Integer,Stream> streams = new HashMap<>();
 
-    private final WriteStateMachine writeStateMachine = new 
WriteStateMachine();
     private final Queue<Object> writeQueue = new ConcurrentLinkedQueue<>();
 
     public Http2UpgradeHandler(Adapter adapter) {
@@ -159,7 +156,6 @@ public class Http2UpgradeHandler extends
 
         switch(status) {
         case OPEN_READ:
-            writeStateMachine.startRead();
             // Gets set to null once the connection preface has been
             // successfully parsed.
             if (connectionPrefaceParser != null) {
@@ -181,13 +177,6 @@ public class Http2UpgradeHandler extends
             try {
                 while (processFrame()) {
                 }
-
-                // We are on a container thread. There is no more data to read
-                // so check for writes (more efficient than dispatching to a 
new
-                // thread).
-                if (writeStateMachine.endRead()) {
-                    processWrites();
-                }
             } catch (Http2Exception h2e) {
                 if (h2e.getStreamId() == 0) {
                     // Connection error
@@ -212,30 +201,28 @@ public class Http2UpgradeHandler extends
             break;
 
         case OPEN_WRITE:
-            if (writeStateMachine.startWrite()) {
-                try {
-                    processWrites();
-                } catch (Http2Exception h2e) {
-                    if (h2e.getStreamId() == 0) {
-                        // Connection error
-                        
log.warn(sm.getString("upgradeHandler.connectionError"), h2e);
-                        close(h2e);
-                        break;
-                    } else {
-                        // Stream error
-                        // TODO Reset stream
-                    }
-                } catch (IOException ioe) {
-                    if (log.isDebugEnabled()) {
-                        log.debug(sm.getString("upgradeHandler.ioerror",
-                                Long.toString(connectionId)), ioe);
-                    }
-                    close();
-                    result = SocketState.CLOSED;
+            try {
+                processWrites();
+            } catch (Http2Exception h2e) {
+                if (h2e.getStreamId() == 0) {
+                    // Connection error
+                    log.warn(sm.getString("upgradeHandler.connectionError"), 
h2e);
+                    close(h2e);
                     break;
+                } else {
+                    // Stream error
+                    // TODO Reset stream
                 }
-
+            } catch (IOException ioe) {
+                if (log.isDebugEnabled()) {
+                    log.debug(sm.getString("upgradeHandler.ioerror",
+                            Long.toString(connectionId)), ioe);
+                }
+                close();
+                result = SocketState.CLOSED;
+                break;
             }
+
             result = SocketState.UPGRADED;
             break;
 
@@ -539,7 +526,7 @@ public class Http2UpgradeHandler extends
                     streamId, Http2Exception.PROTOCOL_ERROR);
         }
         if (streamId == 0) {
-            flowControlWindowSize += windowSizeIncrement;
+            incrementWindowSize(windowSizeIncrement);
         } else {
             Stream stream = getStream(streamId);
             if (stream == null) {
@@ -723,8 +710,7 @@ public class Http2UpgradeHandler extends
     }
 
 
-    void writeBody(Stream stream, ByteBuffer data) throws IOException {
-        data.flip();
+    void writeBody(Stream stream, ByteBuffer data, int len) throws IOException 
{
         if (log.isDebugEnabled()) {
             log.debug(sm.getString("upgradeHandler.writeBody", 
Integer.toString(connectionId),
                     stream.getIdentifier(), 
Integer.toString(data.remaining())));
@@ -732,14 +718,15 @@ public class Http2UpgradeHandler extends
         synchronized (socketWrapper) {
             // TODO Manage window sizes
             byte[] header = new byte[9];
-            ByteUtil.setThreeBytes(header, 0, data.remaining());
+            ByteUtil.setThreeBytes(header, 0, len);
             header[3] = FRAME_TYPE_DATA;
             if (stream.getOutputBuffer().isFinished()) {
                 header[4] = FLAG_END_OF_STREAM;
             }
             ByteUtil.set31Bits(header, 5, stream.getIdentifier().intValue());
             socketWrapper.write(true, header, 0, header.length);
-            socketWrapper.write(true, data.array(), data.arrayOffset(), 
data.limit());
+            socketWrapper.write(true, data.array(), data.arrayOffset() + 
data.position(),
+                    len);
             socketWrapper.flush(true);
         }
     }
@@ -760,31 +747,18 @@ public class Http2UpgradeHandler extends
 
 
     private Object getThingToWrite() {
-        synchronized (writeStateMachine) {
-            // TODO This is more complicated than pulling an object off a 
queue.
+        // TODO This is more complicated than pulling an object off a queue.
 
-            // Note: The checking of the queue for something to write and the
-            //       calling of endWrite() if nothing is found must be kept
-            //       within the same sync to avoid race conditions with adding
-            //       entries to the queue.
-            Object obj = writeQueue.poll();
-            if (obj == null) {
-                 writeStateMachine.endWrite(WriteState.IDLE);
-            }
-            return obj;
-        }
+        // Note: The checking of the queue for something to write and the
+        //       calling of endWrite() if nothing is found must be kept
+        //       within the same sync to avoid race conditions with adding
+        //       entries to the queue.
+        return writeQueue.poll();
     }
 
 
     void addWrite(Object obj) {
-        boolean needDispatch;
-        synchronized (writeStateMachine) {
-            writeQueue.add(obj);
-            needDispatch = writeStateMachine.addWrite();
-        }
-        if (needDispatch) {
-            socketWrapper.processSocket(SocketStatus.OPEN_WRITE, true);
-        }
+        writeQueue.add(obj);
     }
 
 

Modified: tomcat/trunk/java/org/apache/coyote/http2/Stream.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/Stream.java?rev=1680910&r1=1680909&r2=1680910&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http2/Stream.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http2/Stream.java Thu May 21 15:31:43 
2015
@@ -38,20 +38,29 @@ public class Stream extends AbstractStre
     private final Response coyoteResponse = new Response();
     private final StreamOutputBuffer outputBuffer = new StreamOutputBuffer();
 
-    private volatile long flowControlWindowSize;
-
 
     public Stream(Integer identifier, Http2UpgradeHandler handler) {
         super(identifier);
         this.handler = handler;
         setParentStream(handler);
-        flowControlWindowSize = 
handler.getRemoteSettings().getInitialWindowSize();
+        setWindowSize(handler.getRemoteSettings().getInitialWindowSize());
         coyoteResponse.setOutputBuffer(outputBuffer);
     }
 
 
+    @Override
     public void incrementWindowSize(int windowSizeIncrement) {
-        flowControlWindowSize += windowSizeIncrement;
+        // If this is zero then any thread that has been trying to write for
+        // this stream will be waiting. Notify that thread it can continue. Use
+        // notify all even though only one thread is waiting to be on the safe
+        // side.
+        boolean notify = getWindowSize() == 0;
+        super.incrementWindowSize(windowSizeIncrement);
+        if (notify) {
+            synchronized (this) {
+                notifyAll();
+            }
+        }
     }
 
 
@@ -176,11 +185,50 @@ public class Stream extends AbstractStre
         }
 
         public void flush() throws IOException {
+            if (!coyoteResponse.isCommitted()) {
+                coyoteResponse.sendHeaders();
+            }
             if (buffer.position() == 0) {
                 // Buffer is empty. Nothing to do.
                 return;
             }
-            handler.writeBody(Stream.this, buffer);
+            buffer.flip();
+            int left = buffer.remaining();
+            int thisWriteStream;
+            while (left > 0) {
+                // Flow control for the Stream
+                do {
+                    thisWriteStream = reserveWindowSize(left);
+                    if (thisWriteStream < 1) {
+                        // Need to block until a WindowUpdate message is
+                        // processed for this stream;
+                        synchronized (this) {
+                            try {
+                                wait();
+                            } catch (InterruptedException e) {
+                                // TODO. Possible shutdown?
+                            }
+                        }
+                    }
+                } while (thisWriteStream < 1);
+
+                // Flow control for the connection
+                int thisWrite;
+                do {
+                    thisWrite = handler.reserveWindowSize(thisWriteStream);
+                    if (thisWrite < 1) {
+                        // TODO Flow control when connection window is 
exhausted
+                    }
+                } while (thisWrite < 1);
+
+                decrementWindowSize(thisWrite);
+                handler.decrementWindowSize(thisWrite);
+
+                // Do the write
+                handler.writeBody(Stream.this, buffer, thisWrite);
+                left -= thisWrite;
+                buffer.position(buffer.position() + thisWrite);
+            }
             buffer.clear();
         }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to