This is an automated email from the ASF dual-hosted git repository. markt pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/tomcat.git
commit 74681f56475541ff6069aadb1a88bd4d687e7446 Author: Mark Thomas <ma...@apache.org> AuthorDate: Mon Sep 13 15:10:58 2021 +0100 Step 1 - merge BackLogTracker into AbstractStream --- java/org/apache/coyote/http2/AbstractStream.java | 65 +++++++++++ .../apache/coyote/http2/Http2UpgradeHandler.java | 128 ++++----------------- 2 files changed, 86 insertions(+), 107 deletions(-) diff --git a/java/org/apache/coyote/http2/AbstractStream.java b/java/org/apache/coyote/http2/AbstractStream.java index c7374b6..d3195eb 100644 --- a/java/org/apache/coyote/http2/AbstractStream.java +++ b/java/org/apache/coyote/http2/AbstractStream.java @@ -40,6 +40,9 @@ abstract class AbstractStream { private final Set<AbstractNonZeroStream> childStreams = Collections.newSetFromMap(new ConcurrentHashMap<>()); private long windowSize = ConnectionSettingsBase.DEFAULT_INITIAL_WINDOW_SIZE; + volatile int remainingReservation; + volatile int unusedAllocation; + volatile boolean notifyInProgress; AbstractStream(Integer identifier) { this.identifier = identifier; @@ -157,4 +160,66 @@ abstract class AbstractStream { abstract String getConnectionId(); abstract int getWeight(); + + + /** + * @return The number of bytes requiring an allocation from the + * Connection flow control window + */ + public int getRemainingReservation() { + return remainingReservation; + } + + /** + * + * @return The number of bytes allocated from the Connection flow + * control window but not yet written + */ + public int getUnusedAllocation() { + return unusedAllocation; + } + + /** + * The purpose of this is to avoid the incorrect triggering of a timeout + * for the following sequence of events: + * <ol> + * <li>window update 1</li> + * <li>allocation 1</li> + * <li>notify 1</li> + * <li>window update 2</li> + * <li>allocation 2</li> + * <li>act on notify 1 (using allocation 1 and 2)</li> + * <li>notify 2</li> + * <li>act on notify 2 (timeout due to no allocation)</li> + * </ol> + * + * @return {@code true} if a notify has been issued but the associated + * allocation has not been used, otherwise {@code false} + */ + public boolean isNotifyInProgress() { + return notifyInProgress; + } + + public void useAllocation() { + unusedAllocation = 0; + notifyInProgress = false; + } + + public void startNotify() { + notifyInProgress = true; + } + + protected int allocate(int allocation) { + if (remainingReservation >= allocation) { + remainingReservation -= allocation; + unusedAllocation += allocation; + return 0; + } + + int left = allocation - remainingReservation; + unusedAllocation += remainingReservation; + remainingReservation = 0; + + return left; + } } diff --git a/java/org/apache/coyote/http2/Http2UpgradeHandler.java b/java/org/apache/coyote/http2/Http2UpgradeHandler.java index f28ae6d..b45dad0 100644 --- a/java/org/apache/coyote/http2/Http2UpgradeHandler.java +++ b/java/org/apache/coyote/http2/Http2UpgradeHandler.java @@ -20,10 +20,9 @@ import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.util.Collections; import java.util.HashSet; import java.util.Iterator; -import java.util.Map; -import java.util.Map.Entry; import java.util.Queue; import java.util.Set; import java.util.TreeSet; @@ -132,7 +131,7 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH private final AtomicInteger nextLocalStreamId = new AtomicInteger(2); private final PingManager pingManager = getPingManager(); private volatile int newStreamsSinceLastPrune = 0; - private final Map<AbstractStream, BacklogTracker> backLogStreams = new ConcurrentHashMap<>(); + private final Set<AbstractStream> backLogStreams = Collections.newSetFromMap(new ConcurrentHashMap<>()); private long backLogSize = 0; // The time at which the connection will timeout unless data arrives before // then. -1 means no timeout. @@ -882,21 +881,20 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH long windowSize = getWindowSize(); if (windowSize < 1 || backLogSize > 0) { // Has this stream been granted an allocation - BacklogTracker tracker = backLogStreams.get(stream); - if (tracker == null) { - tracker = new BacklogTracker(reservation); - backLogStreams.put(stream, tracker); + if (stream.remainingReservation == 0) { + backLogStreams.add(stream); + stream.remainingReservation = reservation; backLogSize += reservation; // Add the parents as well AbstractStream parent = stream.getParentStream(); - while (parent != null && backLogStreams.putIfAbsent(parent, new BacklogTracker()) == null) { + while (parent != null && backLogStreams.add(parent)) { parent = parent.getParentStream(); } } else { - if (tracker.getUnusedAllocation() > 0) { - allocation = tracker.getUnusedAllocation(); + if (stream.getUnusedAllocation() > 0) { + allocation = stream.getUnusedAllocation(); decrementWindowSize(allocation); - if (tracker.getRemainingReservation() == 0) { + if (stream.getRemainingReservation() == 0) { // The reservation has been fully allocated // so this stream can be removed from the // backlog. @@ -905,7 +903,7 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH // This allocation has been used. Leave the // stream on the backlog as it still has // more bytes to write. - tracker.useAllocation(); + stream.useAllocation(); } } } @@ -928,12 +926,7 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH // Has this stream been granted an allocation // Note: If the stream in not in this Map then the // requested write has been fully allocated - BacklogTracker tracker; - // Ensure allocations made in other threads are visible - synchronized (this) { - tracker = backLogStreams.get(stream); - } - if (tracker != null && tracker.getUnusedAllocation() == 0) { + if (stream.getUnusedAllocation() == 0) { String msg; Http2Error error; if (stream.isActive()) { @@ -1033,7 +1026,7 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH Set<AbstractStream> result = new HashSet<>(); if (backLogSize < increment) { // Can clear the whole backlog - result.addAll(backLogStreams.keySet()); + result.addAll(backLogStreams); backLogStreams.clear(); backLogSize = 0; } else { @@ -1041,13 +1034,13 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH while (leftToAllocate > 0) { leftToAllocate = allocate(this, leftToAllocate); } - for (Entry<AbstractStream,BacklogTracker> entry : backLogStreams.entrySet()) { - int allocation = entry.getValue().getUnusedAllocation(); + for (AbstractStream stream : backLogStreams) { + int allocation = stream.getUnusedAllocation(); if (allocation > 0) { backLogSize -= allocation; - if (!entry.getValue().isNotifyInProgress()) { - result.add(entry.getKey()); - entry.getValue().startNotify(); + if (!stream.isNotifyInProgress()) { + result.add(stream); + stream.startNotify(); } } } @@ -1061,10 +1054,8 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH log.debug(sm.getString("upgradeHandler.allocate.debug", getConnectionId(), stream.getIdAsString(), Integer.toString(allocation))); } - // Allocate to the specified stream - BacklogTracker tracker = backLogStreams.get(stream); - int leftToAllocate = tracker.allocate(allocation); + int leftToAllocate = stream.allocate(allocation); if (leftToAllocate == 0) { return 0; @@ -1078,12 +1069,12 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH // Recipients are children of the current stream that are in the // backlog. Set<AbstractStream> recipients = new HashSet<>(stream.getChildStreams()); - recipients.retainAll(backLogStreams.keySet()); + recipients.retainAll(backLogStreams); // Loop until we run out of allocation or recipients while (leftToAllocate > 0) { if (recipients.size() == 0) { - if (tracker.getUnusedAllocation() == 0) { + if (stream.getUnusedAllocation() == 0) { backLogStreams.remove(stream); } return leftToAllocate; @@ -1832,8 +1823,7 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH if (average < overheadThreshold) { // For Streams, client might only release the minimum so check // against current demand - BacklogTracker tracker = backLogStreams.get(stream); - if (tracker == null || increment < tracker.getRemainingReservation()) { + if (increment < stream.getRemainingReservation()) { // The smaller the increment, the larger the overhead increaseOverheadCount(FrameType.WINDOW_UPDATE, overheadThreshold / average); } @@ -2044,80 +2034,4 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH payload = ByteBuffer.allocate(payload.capacity() * 2); } } - - - private static class BacklogTracker { - - private int remainingReservation; - private int unusedAllocation; - private boolean notifyInProgress; - - public BacklogTracker() { - } - - public BacklogTracker(int reservation) { - remainingReservation = reservation; - } - - /** - * @return The number of bytes requiring an allocation from the - * Connection flow control window - */ - public int getRemainingReservation() { - return remainingReservation; - } - - /** - * - * @return The number of bytes allocated from the Connection flow - * control window but not yet written - */ - public int getUnusedAllocation() { - return unusedAllocation; - } - - /** - * The purpose of this is to avoid the incorrect triggering of a timeout - * for the following sequence of events: - * <ol> - * <li>window update 1</li> - * <li>allocation 1</li> - * <li>notify 1</li> - * <li>window update 2</li> - * <li>allocation 2</li> - * <li>act on notify 1 (using allocation 1 and 2)</li> - * <li>notify 2</li> - * <li>act on notify 2 (timeout due to no allocation)</li> - * </ol> - * - * @return {@code true} if a notify has been issued but the associated - * allocation has not been used, otherwise {@code false} - */ - public boolean isNotifyInProgress() { - return notifyInProgress; - } - - public void useAllocation() { - unusedAllocation = 0; - notifyInProgress = false; - } - - public void startNotify() { - notifyInProgress = true; - } - - private int allocate(int allocation) { - if (remainingReservation >= allocation) { - remainingReservation -= allocation; - unusedAllocation += allocation; - return 0; - } - - int left = allocation - remainingReservation; - unusedAllocation += remainingReservation; - remainingReservation = 0; - - return left; - } - } } --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org