This is an automated email from the ASF dual-hosted git repository.

markt pushed a commit to branch 10.1.x
in repository https://gitbox.apache.org/repos/asf/tomcat.git


The following commit(s) were added to refs/heads/10.1.x by this push:
     new 2afae300c9 Make counting of active streams more robust
2afae300c9 is described below

commit 2afae300c9ac9c0e516e2e9de580847d925365c3
Author: Mark Thomas <ma...@apache.org>
AuthorDate: Wed May 8 08:36:43 2024 +0100

    Make counting of active streams more robust
---
 .../coyote/http2/Http2AsyncUpgradeHandler.java       |  2 +-
 .../org/apache/coyote/http2/Http2UpgradeHandler.java | 20 ++++++++++----------
 java/org/apache/coyote/http2/Stream.java             | 16 ++++++++++++++++
 webapps/docs/changelog.xml                           |  4 ++++
 4 files changed, 31 insertions(+), 11 deletions(-)

diff --git a/java/org/apache/coyote/http2/Http2AsyncUpgradeHandler.java 
b/java/org/apache/coyote/http2/Http2AsyncUpgradeHandler.java
index 2d873b049d..ea046d7b6a 100644
--- a/java/org/apache/coyote/http2/Http2AsyncUpgradeHandler.java
+++ b/java/org/apache/coyote/http2/Http2AsyncUpgradeHandler.java
@@ -157,7 +157,7 @@ public class Http2AsyncUpgradeHandler extends 
Http2UpgradeHandler {
                 boolean active = state.isActive();
                 state.sendReset();
                 if (active) {
-                    decrementActiveRemoteStreamCount();
+                    
decrementActiveRemoteStreamCount(getStream(se.getStreamId()));
                 }
             }
 
diff --git a/java/org/apache/coyote/http2/Http2UpgradeHandler.java 
b/java/org/apache/coyote/http2/Http2UpgradeHandler.java
index 3e6383ba17..3a87692e2d 100644
--- a/java/org/apache/coyote/http2/Http2UpgradeHandler.java
+++ b/java/org/apache/coyote/http2/Http2UpgradeHandler.java
@@ -291,8 +291,8 @@ class Http2UpgradeHandler extends AbstractStream implements 
InternalHttpUpgradeH
     }
 
 
-    protected void decrementActiveRemoteStreamCount() {
-        
setConnectionTimeoutForStreamCount(activeRemoteStreamCount.decrementAndGet());
+    protected void decrementActiveRemoteStreamCount(Stream stream) {
+        
setConnectionTimeoutForStreamCount(stream.decrementAndGetActiveRemoteStreamCount());
     }
 
 
@@ -599,7 +599,7 @@ class Http2UpgradeHandler extends AbstractStream implements 
InternalHttpUpgradeH
                 boolean active = state.isActive();
                 state.sendReset();
                 if (active) {
-                    decrementActiveRemoteStreamCount();
+                    
decrementActiveRemoteStreamCount(getStream(se.getStreamId()));
                 }
             }
             socketWrapper.write(true, rstFrame, 0, rstFrame.length);
@@ -844,7 +844,7 @@ class Http2UpgradeHandler extends AbstractStream implements 
InternalHttpUpgradeH
     protected void sentEndOfStream(Stream stream) {
         stream.sentEndOfStream();
         if (!stream.isActive()) {
-            decrementActiveRemoteStreamCount();
+            decrementActiveRemoteStreamCount(stream);
         }
     }
 
@@ -1227,7 +1227,7 @@ class Http2UpgradeHandler extends AbstractStream 
implements InternalHttpUpgradeH
     }
 
 
-    private Stream getStream(int streamId) {
+    Stream getStream(int streamId) {
         Integer key = Integer.valueOf(streamId);
         AbstractStream result = streams.get(key);
         if (result instanceof Stream) {
@@ -1596,6 +1596,7 @@ class Http2UpgradeHandler extends AbstractStream 
implements InternalHttpUpgradeH
             Stream stream = getStream(streamId, false);
             if (stream == null) {
                 stream = createRemoteStream(streamId);
+                activeRemoteStreamCount.incrementAndGet();
             }
             if (streamId < maxActiveRemoteStreamId) {
                 throw new 
ConnectionException(sm.getString("upgradeHandler.stream.old", 
Integer.valueOf(streamId),
@@ -1674,9 +1675,8 @@ class Http2UpgradeHandler extends AbstractStream 
implements InternalHttpUpgradeH
             Stream stream = (Stream) abstractNonZeroStream;
             if (stream.isActive()) {
                 if (stream.receivedEndOfHeaders()) {
-
-                    if (localSettings.getMaxConcurrentStreams() < 
activeRemoteStreamCount.incrementAndGet()) {
-                        decrementActiveRemoteStreamCount();
+                    if (localSettings.getMaxConcurrentStreams() < 
activeRemoteStreamCount.get()) {
+                        decrementActiveRemoteStreamCount(stream);
                         // Ignoring maxConcurrentStreams increases the 
overhead count
                         increaseOverheadCount(FrameType.HEADERS);
                         throw new StreamException(
@@ -1720,7 +1720,7 @@ class Http2UpgradeHandler extends AbstractStream 
implements InternalHttpUpgradeH
     private void receivedEndOfStream(Stream stream) throws ConnectionException 
{
         stream.receivedEndOfStream();
         if (!stream.isActive()) {
-            decrementActiveRemoteStreamCount();
+            decrementActiveRemoteStreamCount(stream);
         }
     }
 
@@ -1746,7 +1746,7 @@ class Http2UpgradeHandler extends AbstractStream 
implements InternalHttpUpgradeH
             boolean active = stream.isActive();
             stream.receiveReset(errorCode);
             if (active) {
-                decrementActiveRemoteStreamCount();
+                decrementActiveRemoteStreamCount(stream);
             }
         }
     }
diff --git a/java/org/apache/coyote/http2/Stream.java 
b/java/org/apache/coyote/http2/Stream.java
index 4fec394f35..a72bba3a7b 100644
--- a/java/org/apache/coyote/http2/Stream.java
+++ b/java/org/apache/coyote/http2/Stream.java
@@ -28,6 +28,7 @@ import java.util.HashSet;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Supplier;
@@ -91,6 +92,7 @@ class Stream extends AbstractNonZeroStream implements 
HeaderEmitter {
     private final StreamInputBuffer inputBuffer;
     private final StreamOutputBuffer streamOutputBuffer = new 
StreamOutputBuffer();
     private final Http2OutputBuffer http2OutputBuffer = new 
Http2OutputBuffer(coyoteResponse, streamOutputBuffer);
+    private final AtomicBoolean removedFromActiveCount = new 
AtomicBoolean(false);
 
     // State machine would be too much overhead
     private int headerState = HEADER_STATE_START;
@@ -885,6 +887,20 @@ class Stream extends AbstractNonZeroStream implements 
HeaderEmitter {
     }
 
 
+    int decrementAndGetActiveRemoteStreamCount() {
+        /*
+         * Protect against mis-counting of active streams. This method should 
only be called once per stream but since
+         * the count of active streams is used to enforce the maximum 
concurrent streams limit, make sure each stream is
+         * only removed from the active count exactly once.
+         */
+        if (removedFromActiveCount.compareAndSet(false, true)) {
+            return handler.activeRemoteStreamCount.decrementAndGet();
+        } else {
+            return handler.activeRemoteStreamCount.get();
+        }
+    }
+
+
     private static void push(final Http2UpgradeHandler handler, final Request 
request, final Stream stream)
             throws IOException {
         if (org.apache.coyote.Constants.IS_SECURITY_ENABLED) {
diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml
index f717e59bab..445c57059c 100644
--- a/webapps/docs/changelog.xml
+++ b/webapps/docs/changelog.xml
@@ -161,6 +161,10 @@
         <code>Connector</code> element, similar to the <code>Executor</code>
         element, for consistency. (remm)
       </update>
+      <fix>
+        Make counting of active HTTP/2 streams per connection more robust.
+        (markt)
+      </fix>
     </changelog>
   </subsection>
   <subsection name="Jasper">


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to