This is an automated email from the ASF dual-hosted git repository. markt pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/tomcat.git
commit fa6df26815cea9a429291f4452711b96b00f02b0 Author: Mark Thomas <ma...@apache.org> AuthorDate: Thu Sep 24 18:11:37 2020 +0100 Reduce memory footprint of closed http/2 streams This refactoring replaces closed streams with a new RecycledStream object and changes the mechanism used to look up known streams. Refactoring getStream to handle differences between Stream and RecycledStream --- .../apache/coyote/http2/AbstractNonZeroStream.java | 2 + .../apache/coyote/http2/Http2UpgradeHandler.java | 125 ++++++++++++++------- java/org/apache/coyote/http2/RecycledStream.java | 26 ++++- java/org/apache/coyote/http2/Stream.java | 20 ++-- 4 files changed, 111 insertions(+), 62 deletions(-) diff --git a/java/org/apache/coyote/http2/AbstractNonZeroStream.java b/java/org/apache/coyote/http2/AbstractNonZeroStream.java index 70cfe35..582ab1e 100644 --- a/java/org/apache/coyote/http2/AbstractNonZeroStream.java +++ b/java/org/apache/coyote/http2/AbstractNonZeroStream.java @@ -98,4 +98,6 @@ abstract class AbstractNonZeroStream extends AbstractStream { } abstract boolean isClosedFinal(); + + abstract void checkState(FrameType frameType) throws Http2Exception; } diff --git a/java/org/apache/coyote/http2/Http2UpgradeHandler.java b/java/org/apache/coyote/http2/Http2UpgradeHandler.java index cc07372..77dc339 100644 --- a/java/org/apache/coyote/http2/Http2UpgradeHandler.java +++ b/java/org/apache/coyote/http2/Http2UpgradeHandler.java @@ -29,6 +29,7 @@ import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -121,7 +122,7 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH private HpackDecoder hpackDecoder; private HpackEncoder hpackEncoder; - private final Map<Integer,Stream> streams = new ConcurrentHashMap<>(); + private final ConcurrentMap<Integer,AbstractNonZeroStream> streams = new ConcurrentHashMap<>(); protected final AtomicInteger activeRemoteStreamCount = new AtomicInteger(0); // Start at -1 so the 'add 2' logic in closeIdleStreams() works private volatile int maxActiveRemoteStreamId = -1; @@ -1089,7 +1090,22 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH private Stream getStream(int streamId, boolean unknownIsError) throws ConnectionException { Integer key = Integer.valueOf(streamId); - Stream result = streams.get(key); + AbstractStream result = streams.get(key); + if (result instanceof Stream) { + return (Stream) result; + } + if (unknownIsError) { + // Stream has been closed and removed from the map + throw new ConnectionException(sm.getString("upgradeHandler.stream.closed", key.toString()), + Http2Error.PROTOCOL_ERROR); + } + return null; + } + + + private AbstractNonZeroStream getStreamMayBeClosed(int streamId, boolean unknownIsError) throws ConnectionException { + Integer key = Integer.valueOf(streamId); + AbstractNonZeroStream result = streams.get(key); if (result == null && unknownIsError) { // Stream has been closed and removed from the map throw new ConnectionException(sm.getString("upgradeHandler.stream.closed", key.toString()), @@ -1133,10 +1149,12 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH return; } - for (Stream stream : streams.values()) { - // The connection is closing. Close the associated streams as no - // longer required (also notifies any threads waiting for allocations). - stream.receiveReset(Http2Error.CANCEL.getCode()); + for (AbstractNonZeroStream stream : streams.values()) { + if (stream instanceof Stream) { + // The connection is closing. Close the associated streams as no + // longer required (also notifies any threads waiting for allocations). + ((Stream) stream).receiveReset(Http2Error.CANCEL.getCode()); + } } try { socketWrapper.close(); @@ -1193,10 +1211,10 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH TreeSet<Integer> candidatesStepTwo = new TreeSet<>(); TreeSet<Integer> candidatesStepThree = new TreeSet<>(); - for (Entry<Integer, Stream> entry : streams.entrySet()) { - Stream stream = entry.getValue(); + for (Entry<Integer, AbstractNonZeroStream> entry : streams.entrySet()) { + AbstractNonZeroStream stream = entry.getValue(); // Never remove active streams - if (stream.isActive()) { + if (stream instanceof Stream && ((Stream) stream).isActive()) { continue; } @@ -1217,7 +1235,7 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH // Process the step one list for (Integer streamIdToRemove : candidatesStepOne) { // Remove this childless stream - Stream removedStream = streams.remove(streamIdToRemove); + AbstractNonZeroStream removedStream = streams.remove(streamIdToRemove); removedStream.detachFromParent(); toClose--; if (log.isDebugEnabled()) { @@ -1266,7 +1284,7 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH private void removeStreamFromPriorityTree(Integer streamIdToRemove) { - Stream streamToRemove = streams.remove(streamIdToRemove); + AbstractNonZeroStream streamToRemove = streams.remove(streamIdToRemove); // Move the removed Stream's children to the removed Stream's // parent. Set<AbstractNonZeroStream> children = streamToRemove.getChildStreams(); @@ -1415,24 +1433,33 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH } } - Stream stream = getStream(streamId, true); - stream.checkState(FrameType.DATA); - stream.receivedData(payloadSize); - return stream.getInputByteBuffer(); + AbstractNonZeroStream abstractNonZeroStream = getStreamMayBeClosed(streamId, true); + if (abstractNonZeroStream instanceof Stream) { + Stream stream = (Stream) abstractNonZeroStream; + stream.checkState(FrameType.DATA); + stream.receivedData(payloadSize); + return stream.getInputByteBuffer(); + } else { + abstractNonZeroStream.checkState(FrameType.DATA); + return null; + } } @Override public void endRequestBodyFrame(int streamId) throws Http2Exception { - Stream stream = getStream(streamId, true); - stream.getInputBuffer().onDataAvailable(); + AbstractNonZeroStream abstractNonZeroStream = getStreamMayBeClosed(streamId, true); + if (abstractNonZeroStream instanceof Stream) { + ((Stream) abstractNonZeroStream).getInputBuffer().onDataAvailable(); + } } @Override public void receivedEndOfStream(int streamId) throws ConnectionException { - Stream stream = getStream(streamId, connectionState.get().isNewStreamAllowed()); - if (stream != null) { + AbstractNonZeroStream abstractNonZeroStream = getStreamMayBeClosed(streamId, connectionState.get().isNewStreamAllowed()); + if (abstractNonZeroStream instanceof Stream) { + Stream stream = (Stream) abstractNonZeroStream; stream.receivedEndOfStream(); if (!stream.isActive()) { setConnectionTimeoutForStreamCount(activeRemoteStreamCount.decrementAndGet()); @@ -1444,9 +1471,11 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH @Override public void swallowedPadding(int streamId, int paddingLength) throws ConnectionException, IOException { - Stream stream = getStream(streamId, true); - // +1 is for the payload byte used to define the padding length - writeWindowUpdate(stream, paddingLength + 1, false); + AbstractNonZeroStream abstractNonZeroStream = getStreamMayBeClosed(streamId, true); + if (abstractNonZeroStream instanceof Stream) { + // +1 is for the payload byte used to define the padding length + writeWindowUpdate((Stream) abstractNonZeroStream, paddingLength + 1, false); + } } @@ -1485,10 +1514,12 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH private void closeIdleStreams(int newMaxActiveRemoteStreamId) { - for (Entry<Integer,Stream> entry : streams.entrySet()) { - if (entry.getKey().intValue() > maxActiveRemoteStreamId && - entry.getKey().intValue() < newMaxActiveRemoteStreamId) { - entry.getValue().closeIfIdle(); + for (Entry<Integer,AbstractNonZeroStream> entry : streams.entrySet()) { + int id = entry.getKey().intValue(); + if (id > maxActiveRemoteStreamId && id < newMaxActiveRemoteStreamId) { + if (entry.getValue() instanceof Stream) { + ((Stream) entry.getValue()).closeIfIdle(); + } } } maxActiveRemoteStreamId = newMaxActiveRemoteStreamId; @@ -1505,16 +1536,15 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH increaseOverheadCount(); - Stream stream = getStream(streamId, false); - if (stream == null) { - stream = createRemoteStream(streamId); + AbstractNonZeroStream abstractNonZeroStream = getStreamMayBeClosed(streamId, false); + if (abstractNonZeroStream == null) { + abstractNonZeroStream = createRemoteStream(streamId); } - stream.checkState(FrameType.PRIORITY); - AbstractStream parentStream = getStream(parentStreamId, false); + AbstractStream parentStream = getStreamMayBeClosed(parentStreamId, false); if (parentStream == null) { parentStream = this; } - stream.rePrioritise(parentStream, exclusive, weight); + abstractNonZeroStream.rePrioritise(parentStream, exclusive, weight); } @@ -1539,9 +1569,10 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH @Override public void headersEnd(int streamId) throws Http2Exception { - Stream stream = getStream(streamId, connectionState.get().isNewStreamAllowed()); - if (stream != null) { + AbstractNonZeroStream abstractNonZeroStream = getStreamMayBeClosed(streamId, connectionState.get().isNewStreamAllowed()); + if (abstractNonZeroStream instanceof Stream) { setMaxProcessedStream(streamId); + Stream stream = (Stream) abstractNonZeroStream; if (stream.isActive()) { if (stream.receivedEndOfHeaders()) { @@ -1576,12 +1607,15 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH log.debug(sm.getString("upgradeHandler.reset.receive", getConnectionId(), Integer.toString(streamId), Long.toString(errorCode))); } - Stream stream = getStream(streamId, true); - boolean active = stream.isActive(); - stream.checkState(FrameType.RST); - stream.receiveReset(errorCode); - if (active) { - activeRemoteStreamCount.decrementAndGet(); + AbstractNonZeroStream abstractNonZeroStream = getStreamMayBeClosed(streamId, true); + abstractNonZeroStream.checkState(FrameType.RST); + if (abstractNonZeroStream instanceof Stream) { + Stream stream = (Stream) abstractNonZeroStream; + boolean active = stream.isActive(); + stream.receiveReset(errorCode); + if (active) { + activeRemoteStreamCount.decrementAndGet(); + } } } @@ -1602,11 +1636,11 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH // Do this first in case new value is invalid remoteSettings.set(setting, value); int diff = (int) (value - oldValue); - for (Stream stream : streams.values()) { + for (AbstractNonZeroStream stream : streams.values()) { try { stream.incrementWindowSize(diff); } catch (Http2Exception h2e) { - stream.close(new StreamException(sm.getString( + ((Stream) stream).close(new StreamException(sm.getString( "upgradeHandler.windowSizeTooBig", connectionId, stream.getIdAsString()), h2e.getError(), stream.getIdAsInt())); @@ -1679,7 +1713,7 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH incrementWindowSize(increment); } else { - Stream stream = getStream(streamId, true); + AbstractNonZeroStream stream = getStreamMayBeClosed(streamId, true); // Check for small increments which are inefficient if (average < overheadThreshold) { @@ -1705,6 +1739,11 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH } + void replaceStream(AbstractNonZeroStream original, AbstractNonZeroStream replacement) { + streams.replace(original.getIdentifier(), replacement); + } + + protected class PingManager { protected boolean initiateDisabled = false; diff --git a/java/org/apache/coyote/http2/RecycledStream.java b/java/org/apache/coyote/http2/RecycledStream.java index dbbdc10..1915dff 100644 --- a/java/org/apache/coyote/http2/RecycledStream.java +++ b/java/org/apache/coyote/http2/RecycledStream.java @@ -23,12 +23,12 @@ package org.apache.coyote.http2; class RecycledStream extends AbstractNonZeroStream { private final String connectionId; - private final boolean closedFinal; + private final StreamStateMachine state; - RecycledStream(Stream stream) { - super(stream.getIdentifier(), stream.getWeight()); - connectionId = stream.getConnectionId(); - closedFinal = stream.isClosedFinal(); + RecycledStream(String connectionId, Integer identifier, int weight, StreamStateMachine state) { + super(identifier, weight); + this.connectionId = connectionId; + this.state = state; } @@ -40,6 +40,20 @@ class RecycledStream extends AbstractNonZeroStream { @Override boolean isClosedFinal() { - return closedFinal; + return state.isClosedFinal(); + } + + + @Override + final void checkState(FrameType frameType) throws Http2Exception { + state.checkFrameType(frameType); + } + + + @SuppressWarnings("sync-override") + @Override + void incrementWindowSize(int increment) throws Http2Exception { + // NO-OP } } + diff --git a/java/org/apache/coyote/http2/Stream.java b/java/org/apache/coyote/http2/Stream.java index 33f151a..29cea54 100644 --- a/java/org/apache/coyote/http2/Stream.java +++ b/java/org/apache/coyote/http2/Stream.java @@ -193,6 +193,7 @@ class Stream extends AbstractNonZeroStream implements HeaderEmitter { } + @Override final void checkState(FrameType frameType) throws Http2Exception { state.checkFrameType(frameType); } @@ -688,24 +689,17 @@ class Stream extends AbstractNonZeroStream implements HeaderEmitter { /* * This method is called recycle for consistency with the rest of the Tomcat - * code base. Currently, it only sets references to null for the purposes of - * reducing memory footprint. It does not fully recycle the Stream ready for - * re-use since Stream objects are not re-used. This is useful because - * Stream instances are retained for a period after the Stream closes. + * code base. Currently, it calls the handler to replace this stream with an + * implementation that uses less memory. It does not fully recycle the + * Stream ready for re-use since Stream objects are not re-used. This is + * useful because Stream instances are retained for a period after the + * Stream closes. */ final void recycle() { if (log.isDebugEnabled()) { log.debug(sm.getString("stream.recycle", getConnectionId(), getIdAsString())); } - /* - * Temporarily disabled due to multiple regressions (NPEs) - coyoteRequest = null; - cookieHeader = null; - coyoteResponse = null; - inputBuffer = null; - streamOutputBuffer = null; - http2OutputBuffer = null; - */ + handler.replaceStream(this, new RecycledStream(getConnectionId(), getIdentifier(), getWeight(), state)); } --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org