This is an automated email from the ASF dual-hosted git repository. markt pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/tomcat.git
The following commit(s) were added to refs/heads/master by this push: new a51446b Refactor Stream / Connection flow control window allocation a51446b is described below commit a51446b5bb943ab8bb22d4e821c246da2fca73c4 Author: Mark Thomas <ma...@apache.org> AuthorDate: Mon Jun 3 14:15:59 2019 +0100 Refactor Stream / Connection flow control window allocation --- .../apache/coyote/http2/Http2UpgradeHandler.java | 78 +++------ .../apache/coyote/http2/LocalStrings.properties | 11 +- .../apache/coyote/http2/LocalStrings_fr.properties | 2 - .../apache/coyote/http2/LocalStrings_ja.properties | 2 - .../apache/coyote/http2/LocalStrings_ko.properties | 2 - java/org/apache/coyote/http2/Stream.java | 50 +++--- .../coyote/http2/WindowAllocationManager.java | 189 +++++++++++++++++++++ 7 files changed, 243 insertions(+), 91 deletions(-) diff --git a/java/org/apache/coyote/http2/Http2UpgradeHandler.java b/java/org/apache/coyote/http2/Http2UpgradeHandler.java index de54556..665b1a0 100644 --- a/java/org/apache/coyote/http2/Http2UpgradeHandler.java +++ b/java/org/apache/coyote/http2/Http2UpgradeHandler.java @@ -35,12 +35,10 @@ import java.util.concurrent.atomic.AtomicReference; import javax.servlet.http.WebConnection; -import org.apache.coyote.ActionCode; import org.apache.coyote.Adapter; import org.apache.coyote.CloseNowException; import org.apache.coyote.ProtocolException; import org.apache.coyote.Request; -import org.apache.coyote.Response; import org.apache.coyote.http11.upgrade.InternalHttpUpgradeHandler; import org.apache.coyote.http2.HpackDecoder.HeaderEmitter; import org.apache.coyote.http2.HpackEncoder.State; @@ -747,11 +745,10 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH int reserveWindowSize(Stream stream, int reservation, boolean block) throws IOException { - // Need to be holding the connection allocation lock so releaseBacklog() - // can't notify this thread until after this thread enters wait() + // Need to be holding the stream lock so releaseBacklog() can't notify + // this thread until after this thread enters wait() int allocation = 0; - Object connectionAllocationLock = stream.getConnectionAllocationLock(); - synchronized (connectionAllocationLock) { + synchronized (stream) { do { synchronized (this) { if (!stream.canWrite()) { @@ -804,38 +801,35 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH // request is for a stream, use the connection // timeout long writeTimeout = protocol.getWriteTimeout(); - if (writeTimeout < 0) { - connectionAllocationLock.wait(); - } else { - connectionAllocationLock.wait(writeTimeout); - // Has this stream been granted an allocation - // Note: If the stream in not in this Map then the - // requested write has been fully allocated - BacklogTracker tracker; - // Ensure allocations made in other threads are visible - synchronized (this) { - tracker = backLogStreams.get(stream); + stream.waitForConnectionAllocation(writeTimeout); + // Has this stream been granted an allocation + // Note: If the stream in not in this Map then the + // requested write has been fully allocated + BacklogTracker tracker; + // Ensure allocations made in other threads are visible + synchronized (this) { + tracker = backLogStreams.get(stream); + } + if (tracker != null && tracker.getUnusedAllocation() == 0) { + if (log.isDebugEnabled()) { + log.debug(sm.getString("upgradeHandler.noAllocation", + connectionId, stream.getIdentifier())); } - if (tracker != null && tracker.getUnusedAllocation() == 0) { - if (log.isDebugEnabled()) { - log.debug(sm.getString("upgradeHandler.noAllocation", - connectionId, stream.getIdentifier())); - } - // No allocation - // Close the connection. Do this first since - // closing the stream will raise an exception - close(); - // Close the stream (in app code so need to - // signal to app stream is closing) - stream.doWriteTimeout(); + // No allocation + // Close the connection. Do this first since + // closing the stream will raise an exception + close(); + // Close the stream (in app code so need to + // signal to app stream is closing) + stream.doWriteTimeout(); } - } } catch (InterruptedException e) { throw new IOException(sm.getString( "upgradeHandler.windowSizeReservationInterrupted", connectionId, stream.getIdentifier(), Integer.toString(reservation)), e); } } else { + stream.waitForConnectionAllocationNonBlocking(); return 0; } } @@ -872,29 +866,7 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH if (this == stream) { continue; } - Response coyoteResponse = ((Stream) stream).getCoyoteResponse(); - if (coyoteResponse.getWriteListener() == null) { - if (log.isDebugEnabled()) { - log.debug(sm.getString("upgradeHandler.notify", - connectionId, stream.getIdentifier())); - } - // Blocking, so use notify to release StreamOutputBuffer - Object connectionAllocationLock = ((Stream) stream).getConnectionAllocationLock(); - synchronized (connectionAllocationLock) { - connectionAllocationLock.notify(); - } - } else { - if (log.isDebugEnabled()) { - log.debug(sm.getString("upgradeHandler.dispatchWrite", - connectionId, stream.getIdentifier())); - } - // Non-blocking so dispatch - coyoteResponse.action(ActionCode.DISPATCH_WRITE, null); - // Need to explicitly execute dispatches on the - // StreamProcessor as this thread is being processed by an - // UpgradeProcessor which won't see this dispatch - coyoteResponse.action(ActionCode.DISPATCH_EXECUTE, null); - } + ((Stream) stream).notifyConnection(); } } } diff --git a/java/org/apache/coyote/http2/LocalStrings.properties b/java/org/apache/coyote/http2/LocalStrings.properties index 3527b30..e6b5abc 100644 --- a/java/org/apache/coyote/http2/LocalStrings.properties +++ b/java/org/apache/coyote/http2/LocalStrings.properties @@ -122,7 +122,6 @@ upgradeHandler.allocate.left=Connection [{0}], Stream [{1}], [{2}] bytes unalloc upgradeHandler.allocate.recipient=Connection [{0}], Stream [{1}], potential recipient [{2}] with weight [{3}] upgradeHandler.connectionError=Connection error upgradeHandler.dependency.invalid=Connection [{0}], Stream [{1}], Streams may not depend on themselves -upgradeHandler.dispatchWrite=Connection [{0}], Stream [{1}], Dispatching to container thread for async write upgradeHandler.goaway.debug=Connection [{0}], Goaway, Last stream [{1}], Error code [{2}], Debug data [{3}] upgradeHandler.init=Connection [{0}], State [{1}] upgradeHandler.initialWindowSize.invalid=Connection [{0}], Illegal value of [{1}] ignored for initial window size @@ -130,7 +129,6 @@ upgradeHandler.invalidPreface=Connection [{0}], Invalid connection preface upgradeHandler.ioerror=Connection [{0}] upgradeHandler.noAllocation=Connection [{0}], Stream [{1}], Timeout waiting for allocation upgradeHandler.noNewStreams=Connection [{0}], Stream [{1}], Stream ignored as no new streams are permitted on this connection -upgradeHandler.notify=Connection [{0}], Stream [{1}], notify() called to release StreamOutputBuffer upgradeHandler.pause.entry=Connection [{0}] Pausing upgradeHandler.pingFailed=Connection [{0}] Failed to send ping to client upgradeHandler.prefaceReceived=Connection [{0}], Connection preface received from client @@ -160,5 +158,14 @@ upgradeHandler.writeBody=Connection [{0}], Stream [{1}], Data length [{2}] upgradeHandler.writeHeaders=Connection [{0}], Stream [{1}] upgradeHandler.writePushHeaders=Connection [{0}], Stream [{1}], Pushed stream [{2}], EndOfStream [{3}] +windowAllocationManager.dispatched=Connection [{0}], Stream [{1}], Dispatched +windowAllocationManager.notify=Connection [{0}], Stream [{1}], Waiting type [{2}], Notify type [{3}] +windowAllocationManager.notified=Connection [{0}], Stream [{1}], Notified +windowAllocationManager.waitFor.connection=Connection [{0}], Stream [{1}], Waiting for Connection flow control window (blocking) with timeout [{3}] +windowAllocationManager.waitFor.stream=Connection [{0}], Stream [{1}], Waiting for Stream flow control window (blocking) with timeout [{3}] +windowAllocationManager.waitFor.ise=Connection [{0}], Stream [{1}], Already waiting +windowAllocationManager.waitForNonBlocking.connection=Connection [{0}], Stream [{1}], Waiting for Connection flow control window (non-blocking) +windowAllocationManager.waitForNonBlocking.stram=Connection [{0}], Stream [{1}], Waiting for Stream flow control window (non-blocking) + writeStateMachine.endWrite.ise=It is illegal to specify [{0}] for the new state once a write has completed writeStateMachine.ise=It is illegal to call [{0}()] in state [{1}] diff --git a/java/org/apache/coyote/http2/LocalStrings_fr.properties b/java/org/apache/coyote/http2/LocalStrings_fr.properties index e6e1cf3..2c6c4d4 100644 --- a/java/org/apache/coyote/http2/LocalStrings_fr.properties +++ b/java/org/apache/coyote/http2/LocalStrings_fr.properties @@ -122,7 +122,6 @@ upgradeHandler.allocate.left=Connection [{0}], Flux [{1}], [{2}] octets désallo upgradeHandler.allocate.recipient=Connection [{0}], Flux [{1}], receveur potentiel [{2}] avec poids [{3}] upgradeHandler.connectionError=Erreur de la connection upgradeHandler.dependency.invalid=Connection [{0}], Flux [{1}], Un flux ne peut dépendre de lui-même -upgradeHandler.dispatchWrite=Connection [{0}], Flux [{1}], Envoi de l''évènement écriture asynchrone sur un thread du conteneur upgradeHandler.goaway.debug=Connection [{0}], Goaway, Dernier flux [{1}], Code d''erreur [{2}], Données de débogage [{3}] upgradeHandler.init=Connection [{0}], Etat [{1}] upgradeHandler.initialWindowSize.invalid=Connection [{0}], La valeur [{1}] initiale de la taille de fenêtre est invalide @@ -130,7 +129,6 @@ upgradeHandler.invalidPreface=Connection [{0}], Préface de connection invalide upgradeHandler.ioerror=Connection [{0}] upgradeHandler.noAllocation=Connection [{0}], Flux [{1}], Temps d''attente maximum dépassé lors de l''allocation upgradeHandler.noNewStreams=Connection [{0}], Flux [{1}], Flux ignoré car aucun nouveau flux n''est autorisé sur cette connection -upgradeHandler.notify=Connection [{0}], Flux [{1}], notify() appelé pour désallouer StreamOutputBuffer upgradeHandler.pause.entry=Connection [{0}] mise en pause upgradeHandler.pingFailed=La connection [{0}] a échoué à envoyer un ping au client upgradeHandler.prefaceReceived=Connection [{0}], préface de la connection recue du client diff --git a/java/org/apache/coyote/http2/LocalStrings_ja.properties b/java/org/apache/coyote/http2/LocalStrings_ja.properties index 0f69b6e..67f57e6 100644 --- a/java/org/apache/coyote/http2/LocalStrings_ja.properties +++ b/java/org/apache/coyote/http2/LocalStrings_ja.properties @@ -117,14 +117,12 @@ upgradeHandler.allocate.left=コネクション[{0}]、ストリーム[{1}]、[{ upgradeHandler.allocate.recipient=コネクション[{0}]、ストリーム[{1}]、重み[{3}]の潜在的な受信者[{2}] upgradeHandler.connectionError=接続エラー upgradeHandler.dependency.invalid=コネクション [{0}]、ストリーム [{1}]、ストリームは自分自身に依存するべきではありません。 -upgradeHandler.dispatchWrite=コネクション[{0}]、ストリーム[{1}]、非同期書き込みのコンテナスレッドへのディスパッチ upgradeHandler.goaway.debug=コネクション[{0}]、Goaway、最終ストリーム[{1}]、エラーコード[{2}]、デバッグデータ[{3}] upgradeHandler.init=コネクション[{0}]、状態[{1}] upgradeHandler.initialWindowSize.invalid=コネクション[{0}]、[{1}]の無効な値は初期ウィンドウサイズで無視されました upgradeHandler.invalidPreface=コネクション[{0}]、無効なConnection Preface upgradeHandler.ioerror=コネクション[{0}] upgradeHandler.noNewStreams=コネクション [{0}]、ストリーム [{1}]、このコネクションには新しいストリームを作成できないためストリームを無視します。 -upgradeHandler.notify=コネクション [{0}]、ストリーム [{1}]、解放した StreamOutputBuffer の notify() を呼び出しました。 upgradeHandler.pause.entry=コネクション[{0}] 一時停止中 upgradeHandler.pingFailed=コネクション [{0}]、クライアントへ ping を送信できません。 upgradeHandler.prefaceReceived=コネクション [{0}]、クライアントからコネクションプリフェイスを受信しました。 diff --git a/java/org/apache/coyote/http2/LocalStrings_ko.properties b/java/org/apache/coyote/http2/LocalStrings_ko.properties index c4f3bc2..65d4d99 100644 --- a/java/org/apache/coyote/http2/LocalStrings_ko.properties +++ b/java/org/apache/coyote/http2/LocalStrings_ko.properties @@ -119,14 +119,12 @@ upgradeHandler.allocate.left=연결 [{0}], 스트림 [{1}], [{2}] 바이트들 upgradeHandler.allocate.recipient=연결 [{0}], 스트림 [{1}], 가중치 [{3}]의 잠재적 수신자 [{2}] upgradeHandler.connectionError=연결 오류 upgradeHandler.dependency.invalid=연결 [{0}], 스트림 [{1}], 스트림들은 자기 자신들에 의존해서는 안됩니다. -upgradeHandler.dispatchWrite=연결 [{0}], 스트림 [{1}], 비동기 쓰기를 위한 컨테이너 쓰레드로 디스패치합니다. upgradeHandler.goaway.debug=연결 [{0}], Goaway, 마지막 스트림 [{1}], 오류 코드 [{2}], 디버그 데이터 [{3}] upgradeHandler.init=연결 [{0}], 상태 [{1}] upgradeHandler.initialWindowSize.invalid=연결 [{0}]: 값 [{1}]은(는), 초기 윈도우 크기로서 불허되므로, 무시됩니다. upgradeHandler.invalidPreface=연결 [{0}]: 유효하지 않은 연결 preface upgradeHandler.ioerror=연결 [{0}] upgradeHandler.noNewStreams=연결 [{0}], 스트림 [{1}], 이 연결에는 새로운 스트림들이 허용되지 않기에, 스트림이 무시되었습니다. -upgradeHandler.notify=연결 [{0}], 스트림 [{1}], notify()이 StreamOutputBuffer를 해제하기 위하여 호출되었습니다. upgradeHandler.pause.entry=연결 [{0}]이(가) 일시 정지 중 upgradeHandler.pingFailed=연결 [{0}]: 클라이언트에 ping 메시지를 보내지 못했습니다. upgradeHandler.prefaceReceived=연결 [{0}]: 연결 preface를 클라이언트로부터 받았습니다. diff --git a/java/org/apache/coyote/http2/Stream.java b/java/org/apache/coyote/http2/Stream.java index d437c98..b70c227 100644 --- a/java/org/apache/coyote/http2/Stream.java +++ b/java/org/apache/coyote/http2/Stream.java @@ -72,7 +72,7 @@ class Stream extends AbstractStream implements HeaderEmitter { private final Http2UpgradeHandler handler; private final StreamStateMachine state; - private final Object connectionAllocationLock = new Object(); + private final WindowAllocationManager allocationManager = new WindowAllocationManager(this); // State machine would be too much overhead private int headerState = HEADER_STATE_START; @@ -236,14 +236,7 @@ class Stream extends AbstractStream implements HeaderEmitter { final void cancelAllocationRequests() { - // Cancel wait on stream allocation request (if any) - synchronized (this) { - this.notify(); - } - // Cancel wait on connection allocation request (if any) - synchronized (connectionAllocationLock) { - connectionAllocationLock.notify(); - } + allocationManager.notifyAny(); } @@ -261,17 +254,7 @@ class Stream extends AbstractStream implements HeaderEmitter { boolean notify = getWindowSize() < 1; super.incrementWindowSize(windowSizeIncrement); if (notify && getWindowSize() > 0) { - if (coyoteResponse.getWriteListener() == null) { - // Blocking, so use notify to release StreamOutputBuffer - notify(); - } else { - // Non-blocking so dispatch - coyoteResponse.action(ActionCode.DISPATCH_WRITE, null); - // Need to explicitly execute dispatches on the StreamProcessor - // as this thread is being processed by an UpgradeProcessor - // which won't see this dispatch - coyoteResponse.action(ActionCode.DISPATCH_EXECUTE, null); - } + allocationManager.notifyStream(); } } @@ -287,11 +270,7 @@ class Stream extends AbstractStream implements HeaderEmitter { if (block) { try { long writeTimeout = handler.getProtocol().getStreamWriteTimeout(); - if (writeTimeout < 0) { - wait(); - } else { - wait(writeTimeout); - } + allocationManager.waitForStream(writeTimeout); windowSize = getWindowSize(); if (windowSize == 0) { doWriteTimeout(); @@ -303,6 +282,7 @@ class Stream extends AbstractStream implements HeaderEmitter { throw new IOException(e); } } else { + allocationManager.waitForStreamNonBlocking(); return 0; } } @@ -332,6 +312,21 @@ class Stream extends AbstractStream implements HeaderEmitter { } + void waitForConnectionAllocation(long timeout) throws InterruptedException { + allocationManager.waitForConnection(timeout); + } + + + void waitForConnectionAllocationNonBlocking() { + allocationManager.waitForConnectionNonBlocking(); + } + + + void notifyConnection() { + allocationManager.notifyConnection(); + } + + @Override public final void emitHeader(String name, String value) throws HpackException { if (log.isDebugEnabled()) { @@ -783,11 +778,6 @@ class Stream extends AbstractStream implements HeaderEmitter { } - Object getConnectionAllocationLock() { - return connectionAllocationLock; - } - - private static void push(final Http2UpgradeHandler handler, final Request request, final Stream stream) throws IOException { if (org.apache.coyote.Constants.IS_SECURITY_ENABLED) { diff --git a/java/org/apache/coyote/http2/WindowAllocationManager.java b/java/org/apache/coyote/http2/WindowAllocationManager.java new file mode 100644 index 0000000..7626bf3 --- /dev/null +++ b/java/org/apache/coyote/http2/WindowAllocationManager.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.coyote.http2; + +import org.apache.coyote.ActionCode; +import org.apache.juli.logging.Log; +import org.apache.juli.logging.LogFactory; +import org.apache.tomcat.util.res.StringManager; + +/** + * Tracks whether the stream is waiting for an allocation to the stream flow + * control window, to the connection flow control window or not waiting for an + * allocation and only issues allocation notifications when the stream is known + * to be waiting for the notification. + * + * It is possible for a stream to be waiting for a connection allocation when + * a stream allocation is made. Therefore this class tracks the type of + * allocation that the stream is waiting for to ensure that notifications are + * correctly triggered. + * + * With the implementation at the time of writing, it is not possible for a + * stream to receive an unexpected connection notification as these are only + * issues to streams in the backlog and a stream must be waiting for a + * connection allocation in order to be placed on the backlog. However, as a + * precaution, this class protects against unexpected connection notifications. + * + * It is important for asynchronous processing not to notify unless a + * notification is expected else a dispatch will be performed unnecessarily + * which may lead to unexpected results. + * + * A previous implementation used separate locks for the stream and connection + * notifications. However, correct handling of allocation waiting requires + * holding the stream lock when making the decision to wait. Therefore both + * allocations need to wait on the Stream. + */ +class WindowAllocationManager { + + private static final Log log = LogFactory.getLog(WindowAllocationManager.class); + private static final StringManager sm = StringManager.getManager(WindowAllocationManager.class); + + private static final int NONE = 0; + private static final int STREAM = 1; + private static final int CONNECTION = 2; + + private final Stream stream; + + private int waitingFor = NONE; + + WindowAllocationManager(Stream stream) { + this.stream = stream; + } + + void waitForStream(long timeout) throws InterruptedException { + if (log.isDebugEnabled()) { + log.debug(sm.getString("windowAllocationManager.waitFor.stream", + stream.getConnectionId(), stream.getIdentifier(), Long.toString(timeout))); + } + + waitFor(STREAM, timeout); + } + + + void waitForConnection(long timeout) throws InterruptedException { + if (log.isDebugEnabled()) { + log.debug(sm.getString("windowAllocationManager.waitFor.connection", + stream.getConnectionId(), stream.getIdentifier(), Long.toString(timeout))); + } + + waitFor(CONNECTION, timeout); + } + + + void waitForStreamNonBlocking() { + if (log.isDebugEnabled()) { + log.debug(sm.getString("windowAllocationManager.waitForNonBlocking.stream", + stream.getConnectionId(), stream.getIdentifier())); + } + + waitForNonBlocking(STREAM); + } + + + void waitForConnectionNonBlocking() { + if (log.isDebugEnabled()) { + log.debug(sm.getString("windowAllocationManager.waitForNonBlocking.connection", + stream.getConnectionId(), stream.getIdentifier())); + } + + waitForNonBlocking(CONNECTION); + } + + + void notifyStream() { + notify(STREAM); + } + + + void notifyConnection() { + notify(CONNECTION); + } + + + void notifyAny() { + notify(STREAM | CONNECTION); + } + + + private void waitFor(int waitTarget, long timeout) throws InterruptedException { + synchronized (stream) { + if (waitingFor != 0) { + throw new IllegalStateException(sm.getString("windowAllocationManager.waitFor.ise", + stream.getConnectionId(), stream.getIdentifier())); + } + + waitingFor = waitTarget; + + if (timeout < 0) { + stream.wait(); + } else { + stream.wait(timeout); + } + + waitingFor = 0; + } + } + + + private void waitForNonBlocking(int waitTarget) { + synchronized (stream) { + if (waitingFor == 0) { + waitingFor = waitTarget; + } else if (waitingFor == waitTarget) { + // NO-OP + // Non-blocking post-processing may attempt to flush + } else { + throw new IllegalStateException(sm.getString("windowAllocationManager.waitFor.ise", + stream.getConnectionId(), stream.getIdentifier())); + } + + } + } + + + private void notify(int notifyTarget) { + if (log.isDebugEnabled()) { + log.debug(sm.getString("windowAllocationManager.notify", stream.getConnectionId(), + stream.getIdentifier(), Integer.toString(waitingFor), Integer.toString(notifyTarget))); + } + + synchronized (stream) { + if ((notifyTarget & waitingFor) > 0) { + if (stream.getCoyoteResponse().getWriteListener() == null) { + // Blocking, so use notify to release StreamOutputBuffer + if (log.isDebugEnabled()) { + log.debug(sm.getString("windowAllocationManager.notified", + stream.getConnectionId(), stream.getIdentifier())); + } + stream.notify(); + } else { + waitingFor = 0; + // Non-blocking so dispatch + if (log.isDebugEnabled()) { + log.debug(sm.getString("windowAllocationManager.dispatched", + stream.getConnectionId(), stream.getIdentifier())); + } + stream.getCoyoteResponse().action(ActionCode.DISPATCH_WRITE, null); + // Need to explicitly execute dispatches on the StreamProcessor + // as this thread is being processed by an UpgradeProcessor + // which won't see this dispatch + stream.getCoyoteResponse().action(ActionCode.DISPATCH_EXECUTE, null); + } + } + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org