This is an automated email from the ASF dual-hosted git repository. markt pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/tomcat.git
commit aca188a85b607f5fad3457619ed453ba0b7180a7 Author: Mark Thomas <ma...@apache.org> AuthorDate: Mon Sep 13 17:28:16 2021 +0100 Revert "Step 1 - merge BackLogTracker into AbstractStream" This reverts commit 74681f56475541ff6069aadb1a88bd4d687e7446. --- java/org/apache/coyote/http2/AbstractStream.java | 65 ----------- .../apache/coyote/http2/Http2UpgradeHandler.java | 128 +++++++++++++++++---- 2 files changed, 107 insertions(+), 86 deletions(-) diff --git a/java/org/apache/coyote/http2/AbstractStream.java b/java/org/apache/coyote/http2/AbstractStream.java index d3195eb..c7374b6 100644 --- a/java/org/apache/coyote/http2/AbstractStream.java +++ b/java/org/apache/coyote/http2/AbstractStream.java @@ -40,9 +40,6 @@ abstract class AbstractStream { private final Set<AbstractNonZeroStream> childStreams = Collections.newSetFromMap(new ConcurrentHashMap<>()); private long windowSize = ConnectionSettingsBase.DEFAULT_INITIAL_WINDOW_SIZE; - volatile int remainingReservation; - volatile int unusedAllocation; - volatile boolean notifyInProgress; AbstractStream(Integer identifier) { this.identifier = identifier; @@ -160,66 +157,4 @@ abstract class AbstractStream { abstract String getConnectionId(); abstract int getWeight(); - - - /** - * @return The number of bytes requiring an allocation from the - * Connection flow control window - */ - public int getRemainingReservation() { - return remainingReservation; - } - - /** - * - * @return The number of bytes allocated from the Connection flow - * control window but not yet written - */ - public int getUnusedAllocation() { - return unusedAllocation; - } - - /** - * The purpose of this is to avoid the incorrect triggering of a timeout - * for the following sequence of events: - * <ol> - * <li>window update 1</li> - * <li>allocation 1</li> - * <li>notify 1</li> - * <li>window update 2</li> - * <li>allocation 2</li> - * <li>act on notify 1 (using allocation 1 and 2)</li> - * <li>notify 2</li> - * <li>act on notify 2 (timeout due to no allocation)</li> - * </ol> - * - * @return {@code true} if a notify has been issued but the associated - * allocation has not been used, otherwise {@code false} - */ - public boolean isNotifyInProgress() { - return notifyInProgress; - } - - public void useAllocation() { - unusedAllocation = 0; - notifyInProgress = false; - } - - public void startNotify() { - notifyInProgress = true; - } - - protected int allocate(int allocation) { - if (remainingReservation >= allocation) { - remainingReservation -= allocation; - unusedAllocation += allocation; - return 0; - } - - int left = allocation - remainingReservation; - unusedAllocation += remainingReservation; - remainingReservation = 0; - - return left; - } } diff --git a/java/org/apache/coyote/http2/Http2UpgradeHandler.java b/java/org/apache/coyote/http2/Http2UpgradeHandler.java index b45dad0..f28ae6d 100644 --- a/java/org/apache/coyote/http2/Http2UpgradeHandler.java +++ b/java/org/apache/coyote/http2/Http2UpgradeHandler.java @@ -20,9 +20,10 @@ import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; -import java.util.Collections; import java.util.HashSet; import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; import java.util.Queue; import java.util.Set; import java.util.TreeSet; @@ -131,7 +132,7 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH private final AtomicInteger nextLocalStreamId = new AtomicInteger(2); private final PingManager pingManager = getPingManager(); private volatile int newStreamsSinceLastPrune = 0; - private final Set<AbstractStream> backLogStreams = Collections.newSetFromMap(new ConcurrentHashMap<>()); + private final Map<AbstractStream, BacklogTracker> backLogStreams = new ConcurrentHashMap<>(); private long backLogSize = 0; // The time at which the connection will timeout unless data arrives before // then. -1 means no timeout. @@ -881,20 +882,21 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH long windowSize = getWindowSize(); if (windowSize < 1 || backLogSize > 0) { // Has this stream been granted an allocation - if (stream.remainingReservation == 0) { - backLogStreams.add(stream); - stream.remainingReservation = reservation; + BacklogTracker tracker = backLogStreams.get(stream); + if (tracker == null) { + tracker = new BacklogTracker(reservation); + backLogStreams.put(stream, tracker); backLogSize += reservation; // Add the parents as well AbstractStream parent = stream.getParentStream(); - while (parent != null && backLogStreams.add(parent)) { + while (parent != null && backLogStreams.putIfAbsent(parent, new BacklogTracker()) == null) { parent = parent.getParentStream(); } } else { - if (stream.getUnusedAllocation() > 0) { - allocation = stream.getUnusedAllocation(); + if (tracker.getUnusedAllocation() > 0) { + allocation = tracker.getUnusedAllocation(); decrementWindowSize(allocation); - if (stream.getRemainingReservation() == 0) { + if (tracker.getRemainingReservation() == 0) { // The reservation has been fully allocated // so this stream can be removed from the // backlog. @@ -903,7 +905,7 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH // This allocation has been used. Leave the // stream on the backlog as it still has // more bytes to write. - stream.useAllocation(); + tracker.useAllocation(); } } } @@ -926,7 +928,12 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH // Has this stream been granted an allocation // Note: If the stream in not in this Map then the // requested write has been fully allocated - if (stream.getUnusedAllocation() == 0) { + BacklogTracker tracker; + // Ensure allocations made in other threads are visible + synchronized (this) { + tracker = backLogStreams.get(stream); + } + if (tracker != null && tracker.getUnusedAllocation() == 0) { String msg; Http2Error error; if (stream.isActive()) { @@ -1026,7 +1033,7 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH Set<AbstractStream> result = new HashSet<>(); if (backLogSize < increment) { // Can clear the whole backlog - result.addAll(backLogStreams); + result.addAll(backLogStreams.keySet()); backLogStreams.clear(); backLogSize = 0; } else { @@ -1034,13 +1041,13 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH while (leftToAllocate > 0) { leftToAllocate = allocate(this, leftToAllocate); } - for (AbstractStream stream : backLogStreams) { - int allocation = stream.getUnusedAllocation(); + for (Entry<AbstractStream,BacklogTracker> entry : backLogStreams.entrySet()) { + int allocation = entry.getValue().getUnusedAllocation(); if (allocation > 0) { backLogSize -= allocation; - if (!stream.isNotifyInProgress()) { - result.add(stream); - stream.startNotify(); + if (!entry.getValue().isNotifyInProgress()) { + result.add(entry.getKey()); + entry.getValue().startNotify(); } } } @@ -1054,8 +1061,10 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH log.debug(sm.getString("upgradeHandler.allocate.debug", getConnectionId(), stream.getIdAsString(), Integer.toString(allocation))); } + // Allocate to the specified stream + BacklogTracker tracker = backLogStreams.get(stream); - int leftToAllocate = stream.allocate(allocation); + int leftToAllocate = tracker.allocate(allocation); if (leftToAllocate == 0) { return 0; @@ -1069,12 +1078,12 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH // Recipients are children of the current stream that are in the // backlog. Set<AbstractStream> recipients = new HashSet<>(stream.getChildStreams()); - recipients.retainAll(backLogStreams); + recipients.retainAll(backLogStreams.keySet()); // Loop until we run out of allocation or recipients while (leftToAllocate > 0) { if (recipients.size() == 0) { - if (stream.getUnusedAllocation() == 0) { + if (tracker.getUnusedAllocation() == 0) { backLogStreams.remove(stream); } return leftToAllocate; @@ -1823,7 +1832,8 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH if (average < overheadThreshold) { // For Streams, client might only release the minimum so check // against current demand - if (increment < stream.getRemainingReservation()) { + BacklogTracker tracker = backLogStreams.get(stream); + if (tracker == null || increment < tracker.getRemainingReservation()) { // The smaller the increment, the larger the overhead increaseOverheadCount(FrameType.WINDOW_UPDATE, overheadThreshold / average); } @@ -2034,4 +2044,80 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH payload = ByteBuffer.allocate(payload.capacity() * 2); } } + + + private static class BacklogTracker { + + private int remainingReservation; + private int unusedAllocation; + private boolean notifyInProgress; + + public BacklogTracker() { + } + + public BacklogTracker(int reservation) { + remainingReservation = reservation; + } + + /** + * @return The number of bytes requiring an allocation from the + * Connection flow control window + */ + public int getRemainingReservation() { + return remainingReservation; + } + + /** + * + * @return The number of bytes allocated from the Connection flow + * control window but not yet written + */ + public int getUnusedAllocation() { + return unusedAllocation; + } + + /** + * The purpose of this is to avoid the incorrect triggering of a timeout + * for the following sequence of events: + * <ol> + * <li>window update 1</li> + * <li>allocation 1</li> + * <li>notify 1</li> + * <li>window update 2</li> + * <li>allocation 2</li> + * <li>act on notify 1 (using allocation 1 and 2)</li> + * <li>notify 2</li> + * <li>act on notify 2 (timeout due to no allocation)</li> + * </ol> + * + * @return {@code true} if a notify has been issued but the associated + * allocation has not been used, otherwise {@code false} + */ + public boolean isNotifyInProgress() { + return notifyInProgress; + } + + public void useAllocation() { + unusedAllocation = 0; + notifyInProgress = false; + } + + public void startNotify() { + notifyInProgress = true; + } + + private int allocate(int allocation) { + if (remainingReservation >= allocation) { + remainingReservation -= allocation; + unusedAllocation += allocation; + return 0; + } + + int left = allocation - remainingReservation; + unusedAllocation += remainingReservation; + remainingReservation = 0; + + return left; + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org