This is an automated email from the ASF dual-hosted git repository. markt pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/tomcat.git
The following commit(s) were added to refs/heads/main by this push: new 762eb130a8 Refactor to reduce pinning in HTTP/2 code when using virtual threads 762eb130a8 is described below commit 762eb130a8608c18375a84276323f7cb65f7333a Author: Mark Thomas <ma...@apache.org> AuthorDate: Thu Jul 27 15:37:59 2023 +0100 Refactor to reduce pinning in HTTP/2 code when using virtual threads --- java/org/apache/coyote/http2/AbstractStream.java | 84 ++++--- .../apache/coyote/http2/Http2UpgradeHandler.java | 241 +++++++++++---------- java/org/apache/coyote/http2/RecycledStream.java | 1 - java/org/apache/coyote/http2/Stream.java | 84 +++---- .../coyote/http2/WindowAllocationManager.java | 27 ++- 5 files changed, 250 insertions(+), 187 deletions(-) diff --git a/java/org/apache/coyote/http2/AbstractStream.java b/java/org/apache/coyote/http2/AbstractStream.java index f332b8c593..d6fb8d8280 100644 --- a/java/org/apache/coyote/http2/AbstractStream.java +++ b/java/org/apache/coyote/http2/AbstractStream.java @@ -16,6 +16,10 @@ */ package org.apache.coyote.http2; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + import org.apache.juli.logging.Log; import org.apache.juli.logging.LogFactory; import org.apache.tomcat.util.res.StringManager; @@ -33,6 +37,8 @@ abstract class AbstractStream { private final String idAsString; private long windowSize = ConnectionSettingsBase.DEFAULT_INITIAL_WINDOW_SIZE; + protected final Lock windowAllocationLock = new ReentrantLock(); + protected final Condition windowAllocationAvailable = windowAllocationLock.newCondition(); private volatile int connectionAllocationRequested = 0; private volatile int connectionAllocationMade = 0; @@ -59,13 +65,23 @@ abstract class AbstractStream { } - final synchronized void setWindowSize(long windowSize) { - this.windowSize = windowSize; + final void setWindowSize(long windowSize) { + windowAllocationLock.lock(); + try { + this.windowSize = windowSize; + } finally { + windowAllocationLock.unlock(); + } } - final synchronized long getWindowSize() { - return windowSize; + final long getWindowSize() { + windowAllocationLock.lock(); + try { + return windowSize; + } finally { + windowAllocationLock.unlock(); + } } @@ -76,37 +92,47 @@ abstract class AbstractStream { * * @throws Http2Exception If the window size is now higher than the maximum allowed */ - synchronized void incrementWindowSize(int increment) throws Http2Exception { - // No need for overflow protection here. - // Increment can't be more than Integer.MAX_VALUE and once windowSize - // goes beyond 2^31-1 an error is triggered. - windowSize += increment; - - if (log.isDebugEnabled()) { - log.debug(sm.getString("abstractStream.windowSizeInc", getConnectionId(), getIdAsString(), - Integer.toString(increment), Long.toString(windowSize))); - } + void incrementWindowSize(int increment) throws Http2Exception { + windowAllocationLock.lock(); + try { + // No need for overflow protection here. + // Increment can't be more than Integer.MAX_VALUE and once windowSize + // goes beyond 2^31-1 an error is triggered. + windowSize += increment; + + if (log.isDebugEnabled()) { + log.debug(sm.getString("abstractStream.windowSizeInc", getConnectionId(), getIdAsString(), + Integer.toString(increment), Long.toString(windowSize))); + } - if (windowSize > ConnectionSettingsBase.MAX_WINDOW_SIZE) { - String msg = sm.getString("abstractStream.windowSizeTooBig", getConnectionId(), identifier, - Integer.toString(increment), Long.toString(windowSize)); - if (identifier.intValue() == 0) { - throw new ConnectionException(msg, Http2Error.FLOW_CONTROL_ERROR); - } else { - throw new StreamException(msg, Http2Error.FLOW_CONTROL_ERROR, identifier.intValue()); + if (windowSize > ConnectionSettingsBase.MAX_WINDOW_SIZE) { + String msg = sm.getString("abstractStream.windowSizeTooBig", getConnectionId(), identifier, + Integer.toString(increment), Long.toString(windowSize)); + if (identifier.intValue() == 0) { + throw new ConnectionException(msg, Http2Error.FLOW_CONTROL_ERROR); + } else { + throw new StreamException(msg, Http2Error.FLOW_CONTROL_ERROR, identifier.intValue()); + } } + } finally { + windowAllocationLock.unlock(); } } - final synchronized void decrementWindowSize(int decrement) { - // No need for overflow protection here. Decrement can never be larger - // the Integer.MAX_VALUE and once windowSize goes negative no further - // decrements are permitted - windowSize -= decrement; - if (log.isDebugEnabled()) { - log.debug(sm.getString("abstractStream.windowSizeDec", getConnectionId(), getIdAsString(), - Integer.toString(decrement), Long.toString(windowSize))); + final void decrementWindowSize(int decrement) { + windowAllocationLock.lock(); + try { + // No need for overflow protection here. Decrement can never be larger + // the Integer.MAX_VALUE and once windowSize goes negative no further + // decrements are permitted + windowSize -= decrement; + if (log.isDebugEnabled()) { + log.debug(sm.getString("abstractStream.windowSizeDec", getConnectionId(), getIdAsString(), + Integer.toString(decrement), Long.toString(windowSize))); + } + } finally { + windowAllocationLock.unlock(); } } diff --git a/java/org/apache/coyote/http2/Http2UpgradeHandler.java b/java/org/apache/coyote/http2/Http2UpgradeHandler.java index 73ddc28f3f..e648d4dace 100644 --- a/java/org/apache/coyote/http2/Http2UpgradeHandler.java +++ b/java/org/apache/coyote/http2/Http2UpgradeHandler.java @@ -929,8 +929,10 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH // Need to be holding the stream lock so releaseBacklog() can't notify // this thread until after this thread enters wait() int allocation = 0; - synchronized (stream) { - synchronized (this) { + stream.windowAllocationLock.lock(); + try { + windowAllocationLock.lock(); + try { if (!stream.canWrite()) { stream.doStreamCancel( sm.getString("upgradeHandler.stream.notWritable", stream.getConnectionId(), @@ -955,6 +957,8 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH allocation = reservation; decrementWindowSize(allocation); } + } finally { + windowAllocationLock.unlock(); } if (allocation == 0) { if (block) { @@ -1001,18 +1005,19 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH return 0; } } + } finally { + stream.windowAllocationLock.unlock(); } return allocation; } - @SuppressWarnings("sync-override") // notify() needs to be outside sync - // to avoid deadlock @Override protected void incrementWindowSize(int increment) throws Http2Exception { Set<AbstractStream> streamsToNotify = null; - synchronized (this) { + windowAllocationLock.lock(); + try { long windowSize = getWindowSize(); if (windowSize < 1 && windowSize + increment > 0) { // Connection window is exhausted. Assume there will be streams @@ -1021,6 +1026,8 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH } else { super.incrementWindowSize(increment); } + } finally { + windowAllocationLock.unlock(); } if (streamsToNotify != null) { @@ -1053,134 +1060,144 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH } - private synchronized Set<AbstractStream> releaseBackLog(int increment) throws Http2Exception { - Set<AbstractStream> result = new HashSet<>(); - if (backLogSize < increment) { - // Can clear the whole backlog - for (AbstractStream stream : backLogStreams) { - if (stream.getConnectionAllocationRequested() > 0) { - stream.setConnectionAllocationMade(stream.getConnectionAllocationRequested()); - stream.setConnectionAllocationRequested(0); - result.add(stream); + private Set<AbstractStream> releaseBackLog(int increment) throws Http2Exception { + windowAllocationLock.lock(); + try { + Set<AbstractStream> result = new HashSet<>(); + if (backLogSize < increment) { + // Can clear the whole backlog + for (AbstractStream stream : backLogStreams) { + if (stream.getConnectionAllocationRequested() > 0) { + stream.setConnectionAllocationMade(stream.getConnectionAllocationRequested()); + stream.setConnectionAllocationRequested(0); + result.add(stream); + } } - } - // Cast is safe due to test above - int remaining = increment - (int) backLogSize; - backLogSize = 0; - super.incrementWindowSize(remaining); + // Cast is safe due to test above + int remaining = increment - (int) backLogSize; + backLogSize = 0; + super.incrementWindowSize(remaining); - backLogStreams.clear(); - } else { - // Can't clear the whole backlog. - // Need streams in priority order - Set<Stream> orderedStreams = new ConcurrentSkipListSet<>(Comparator.comparingInt(Stream::getUrgency) - .thenComparing(Stream::getIncremental).thenComparing(Stream::getIdAsInt)); - orderedStreams.addAll(backLogStreams); - - // Iteration 1. Need to work out how much we can clear. - long urgencyWhereAllocationIsExhausted = 0; - long requestedAllocationForIncrementalStreams = 0; - int remaining = increment; - Iterator<Stream> orderedStreamsIterator = orderedStreams.iterator(); - while (orderedStreamsIterator.hasNext()) { - Stream s = orderedStreamsIterator.next(); - if (urgencyWhereAllocationIsExhausted < s.getUrgency()) { - if (remaining < 1) { - break; + backLogStreams.clear(); + } else { + // Can't clear the whole backlog. + // Need streams in priority order + Set<Stream> orderedStreams = new ConcurrentSkipListSet<>(Comparator.comparingInt(Stream::getUrgency) + .thenComparing(Stream::getIncremental).thenComparing(Stream::getIdAsInt)); + orderedStreams.addAll(backLogStreams); + + // Iteration 1. Need to work out how much we can clear. + long urgencyWhereAllocationIsExhausted = 0; + long requestedAllocationForIncrementalStreams = 0; + int remaining = increment; + Iterator<Stream> orderedStreamsIterator = orderedStreams.iterator(); + while (orderedStreamsIterator.hasNext()) { + Stream s = orderedStreamsIterator.next(); + if (urgencyWhereAllocationIsExhausted < s.getUrgency()) { + if (remaining < 1) { + break; + } + requestedAllocationForIncrementalStreams = 0; } - requestedAllocationForIncrementalStreams = 0; - } - urgencyWhereAllocationIsExhausted = s.getUrgency(); - if (s.getIncremental()) { - requestedAllocationForIncrementalStreams += s.getConnectionAllocationRequested(); - remaining -= s.getConnectionAllocationRequested(); - } else { - remaining -= s.getConnectionAllocationRequested(); - if (remaining < 1) { - break; + urgencyWhereAllocationIsExhausted = s.getUrgency(); + if (s.getIncremental()) { + requestedAllocationForIncrementalStreams += s.getConnectionAllocationRequested(); + remaining -= s.getConnectionAllocationRequested(); + } else { + remaining -= s.getConnectionAllocationRequested(); + if (remaining < 1) { + break; + } } } - } - // Iteration 2. Allocate. - // Reset for second iteration - remaining = increment; - orderedStreamsIterator = orderedStreams.iterator(); - while (orderedStreamsIterator.hasNext()) { - Stream s = orderedStreamsIterator.next(); - if (s.getUrgency() < urgencyWhereAllocationIsExhausted) { - // Can fully allocate - remaining = allocate(s, remaining); - result.add(s); - orderedStreamsIterator.remove(); - backLogStreams.remove(s); - } else if (requestedAllocationForIncrementalStreams == 0) { - // Allocation ran out in non-incremental streams so fully - // allocate in iterator order until allocation is exhausted - remaining = allocate(s, remaining); - result.add(s); - if (s.getConnectionAllocationRequested() == 0) { - // Fully allocated + // Iteration 2. Allocate. + // Reset for second iteration + remaining = increment; + orderedStreamsIterator = orderedStreams.iterator(); + while (orderedStreamsIterator.hasNext()) { + Stream s = orderedStreamsIterator.next(); + if (s.getUrgency() < urgencyWhereAllocationIsExhausted) { + // Can fully allocate + remaining = allocate(s, remaining); + result.add(s); orderedStreamsIterator.remove(); backLogStreams.remove(s); - } - if (remaining < 1) { - break; - } - } else { - // Allocation ran out in incremental streams. Distribute - // remaining allocation between the incremental streams at - // this urgency level. - if (s.getUrgency() != urgencyWhereAllocationIsExhausted) { - break; - } + } else if (requestedAllocationForIncrementalStreams == 0) { + // Allocation ran out in non-incremental streams so fully + // allocate in iterator order until allocation is exhausted + remaining = allocate(s, remaining); + result.add(s); + if (s.getConnectionAllocationRequested() == 0) { + // Fully allocated + orderedStreamsIterator.remove(); + backLogStreams.remove(s); + } + if (remaining < 1) { + break; + } + } else { + // Allocation ran out in incremental streams. Distribute + // remaining allocation between the incremental streams at + // this urgency level. + if (s.getUrgency() != urgencyWhereAllocationIsExhausted) { + break; + } - int share = (int) (s.getConnectionAllocationRequested() * remaining / - requestedAllocationForIncrementalStreams); - if (share == 0) { - share = 1; - } - allocate(s, share); - result.add(s); - if (s.getConnectionAllocationRequested() == 0) { - // Fully allocated (unlikely but possible due to - // rounding if only a few bytes required). - orderedStreamsIterator.remove(); - backLogStreams.remove(s); + int share = (int) (s.getConnectionAllocationRequested() * remaining / + requestedAllocationForIncrementalStreams); + if (share == 0) { + share = 1; + } + allocate(s, share); + result.add(s); + if (s.getConnectionAllocationRequested() == 0) { + // Fully allocated (unlikely but possible due to + // rounding if only a few bytes required). + orderedStreamsIterator.remove(); + backLogStreams.remove(s); + } } } } + return result; + } finally { + windowAllocationLock.unlock(); } - return result; } - private synchronized int allocate(AbstractStream stream, int allocation) { - if (log.isDebugEnabled()) { - log.debug(sm.getString("upgradeHandler.allocate.debug", getConnectionId(), stream.getIdAsString(), - Integer.toString(allocation))); - } + private int allocate(AbstractStream stream, int allocation) { + windowAllocationLock.lock(); + try { + if (log.isDebugEnabled()) { + log.debug(sm.getString("upgradeHandler.allocate.debug", getConnectionId(), stream.getIdAsString(), + Integer.toString(allocation))); + } - int leftToAllocate = allocation; + int leftToAllocate = allocation; - if (stream.getConnectionAllocationRequested() > 0) { - int allocatedThisTime; - if (allocation >= stream.getConnectionAllocationRequested()) { - allocatedThisTime = stream.getConnectionAllocationRequested(); - } else { - allocatedThisTime = allocation; + if (stream.getConnectionAllocationRequested() > 0) { + int allocatedThisTime; + if (allocation >= stream.getConnectionAllocationRequested()) { + allocatedThisTime = stream.getConnectionAllocationRequested(); + } else { + allocatedThisTime = allocation; + } + stream.setConnectionAllocationRequested(stream.getConnectionAllocationRequested() - allocatedThisTime); + stream.setConnectionAllocationMade(stream.getConnectionAllocationMade() + allocatedThisTime); + leftToAllocate = leftToAllocate - allocatedThisTime; } - stream.setConnectionAllocationRequested(stream.getConnectionAllocationRequested() - allocatedThisTime); - stream.setConnectionAllocationMade(stream.getConnectionAllocationMade() + allocatedThisTime); - leftToAllocate = leftToAllocate - allocatedThisTime; - } - if (log.isDebugEnabled()) { - log.debug(sm.getString("upgradeHandler.allocate.left", getConnectionId(), stream.getIdAsString(), - Integer.toString(leftToAllocate))); - } + if (log.isDebugEnabled()) { + log.debug(sm.getString("upgradeHandler.allocate.left", getConnectionId(), stream.getIdAsString(), + Integer.toString(leftToAllocate))); + } - return leftToAllocate; + return leftToAllocate; + } finally { + windowAllocationLock.unlock(); + } } diff --git a/java/org/apache/coyote/http2/RecycledStream.java b/java/org/apache/coyote/http2/RecycledStream.java index 143ae1d371..c4c180ac1f 100644 --- a/java/org/apache/coyote/http2/RecycledStream.java +++ b/java/org/apache/coyote/http2/RecycledStream.java @@ -40,7 +40,6 @@ class RecycledStream extends AbstractNonZeroStream { } - @SuppressWarnings("sync-override") @Override void incrementWindowSize(int increment) throws Http2Exception { // NO-OP diff --git a/java/org/apache/coyote/http2/Stream.java b/java/org/apache/coyote/http2/Stream.java index ff1f24ff12..1a4ab09743 100644 --- a/java/org/apache/coyote/http2/Stream.java +++ b/java/org/apache/coyote/http2/Stream.java @@ -216,52 +216,62 @@ class Stream extends AbstractNonZeroStream implements HeaderEmitter { @Override - final synchronized void incrementWindowSize(int windowSizeIncrement) throws Http2Exception { - // If this is zero then any thread that has been trying to write for - // this stream will be waiting. Notify that thread it can continue. Use - // notify all even though only one thread is waiting to be on the safe - // side. - boolean notify = getWindowSize() < 1; - super.incrementWindowSize(windowSizeIncrement); - if (notify && getWindowSize() > 0) { - allocationManager.notifyStream(); + final void incrementWindowSize(int windowSizeIncrement) throws Http2Exception { + windowAllocationLock.lock(); + try { + // If this is zero then any thread that has been trying to write for + // this stream will be waiting. Notify that thread it can continue. Use + // notify all even though only one thread is waiting to be on the safe + // side. + boolean notify = getWindowSize() < 1; + super.incrementWindowSize(windowSizeIncrement); + if (notify && getWindowSize() > 0) { + allocationManager.notifyStream(); + } + } finally { + windowAllocationLock.unlock(); } } - final synchronized int reserveWindowSize(int reservation, boolean block) throws IOException { - long windowSize = getWindowSize(); - while (windowSize < 1) { - if (!canWrite()) { - throw new CloseNowException(sm.getString("stream.notWritable", getConnectionId(), getIdAsString())); - } - if (block) { - try { - long writeTimeout = handler.getProtocol().getStreamWriteTimeout(); - allocationManager.waitForStream(writeTimeout); - windowSize = getWindowSize(); - if (windowSize == 0) { - doStreamCancel(sm.getString("stream.writeTimeout"), Http2Error.ENHANCE_YOUR_CALM); + final int reserveWindowSize(int reservation, boolean block) throws IOException { + windowAllocationLock.lock(); + try { + long windowSize = getWindowSize(); + while (windowSize < 1) { + if (!canWrite()) { + throw new CloseNowException(sm.getString("stream.notWritable", getConnectionId(), getIdAsString())); + } + if (block) { + try { + long writeTimeout = handler.getProtocol().getStreamWriteTimeout(); + allocationManager.waitForStream(writeTimeout); + windowSize = getWindowSize(); + if (windowSize == 0) { + doStreamCancel(sm.getString("stream.writeTimeout"), Http2Error.ENHANCE_YOUR_CALM); + } + } catch (InterruptedException e) { + // Possible shutdown / rst or similar. Use an IOException to + // signal to the client that further I/O isn't possible for this + // Stream. + throw new IOException(e); } - } catch (InterruptedException e) { - // Possible shutdown / rst or similar. Use an IOException to - // signal to the client that further I/O isn't possible for this - // Stream. - throw new IOException(e); + } else { + allocationManager.waitForStreamNonBlocking(); + return 0; } + } + int allocation; + if (windowSize < reservation) { + allocation = (int) windowSize; } else { - allocationManager.waitForStreamNonBlocking(); - return 0; + allocation = reservation; } + decrementWindowSize(allocation); + return allocation; + } finally { + windowAllocationLock.unlock(); } - int allocation; - if (windowSize < reservation) { - allocation = (int) windowSize; - } else { - allocation = reservation; - } - decrementWindowSize(allocation); - return allocation; } diff --git a/java/org/apache/coyote/http2/WindowAllocationManager.java b/java/org/apache/coyote/http2/WindowAllocationManager.java index e784c4083c..811fe1821e 100644 --- a/java/org/apache/coyote/http2/WindowAllocationManager.java +++ b/java/org/apache/coyote/http2/WindowAllocationManager.java @@ -129,14 +129,18 @@ class WindowAllocationManager { private boolean isWaitingFor(int waitTarget) { - synchronized (stream) { + stream.windowAllocationLock.lock(); + try { return (waitingFor & waitTarget) > 0; + } finally { + stream.windowAllocationLock.unlock(); } } private void waitFor(int waitTarget, final long timeout) throws InterruptedException { - synchronized (stream) { + stream.windowAllocationLock.lock(); + try { if (waitingFor != NONE) { throw new IllegalStateException(sm.getString("windowAllocationManager.waitFor.ise", stream.getConnectionId(), stream.getIdAsString())); @@ -148,7 +152,7 @@ class WindowAllocationManager { // Loop to handle spurious wake-ups do { if (timeout < 0) { - stream.wait(); + stream.windowAllocationAvailable.await(); } else { long timeoutRemaining; if (startNanos == -1) { @@ -164,15 +168,18 @@ class WindowAllocationManager { return; } } - stream.wait(timeoutRemaining); + stream.windowAllocationAvailable.await(timeoutRemaining, TimeUnit.MILLISECONDS); } } while (waitingFor != NONE); + } finally { + stream.windowAllocationLock.unlock(); } } private void waitForNonBlocking(int waitTarget) { - synchronized (stream) { + stream.windowAllocationLock.lock(); + try { if (waitingFor == NONE) { waitingFor = waitTarget; } else if (waitingFor == waitTarget) { @@ -182,14 +189,16 @@ class WindowAllocationManager { throw new IllegalStateException(sm.getString("windowAllocationManager.waitFor.ise", stream.getConnectionId(), stream.getIdAsString())); } - + } finally { + stream.windowAllocationLock.unlock(); } } private void notify(int notifyTarget) { - synchronized (stream) { + stream.windowAllocationLock.lock(); + try { if (log.isDebugEnabled()) { log.debug(sm.getString("windowAllocationManager.notify", stream.getConnectionId(), stream.getIdAsString(), Integer.toString(waitingFor), Integer.toString(notifyTarget))); @@ -210,7 +219,7 @@ class WindowAllocationManager { log.debug(sm.getString("windowAllocationManager.notified", stream.getConnectionId(), stream.getIdAsString())); } - stream.notify(); + stream.windowAllocationAvailable.signal(); } else { // Non-blocking so dispatch if (log.isDebugEnabled()) { @@ -225,6 +234,8 @@ class WindowAllocationManager { } } } + } finally { + stream.windowAllocationLock.unlock(); } } } --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org