This is an automated email from the ASF dual-hosted git repository.
zrlw pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.3 by this push:
new d079dec009 [3.3] Add HTTP/2 server connection preface process (#15535)
d079dec009 is described below
commit d079dec0091eb748d61851de2e957709a0b7853d
Author: zrlw <[email protected]>
AuthorDate: Sun Jul 20 11:04:00 2025 +0800
[3.3] Add HTTP/2 server connection preface process (#15535)
* Add HTTP/2 server connection preface process
* Close channel handler context and release resources if client connection
preface does not arrive in time
---
.../http12/netty4/h2/NettyHttp2FrameCodec.java | 113 +++++++++++++++++++--
.../netty4/h2/NettyHttp2SettingsHandler.java | 78 ++++++++++++++
.../netty4/http2/Http2ClientSettingsHandler.java | 5 +-
.../rpc/protocol/tri/TripleHttp2Protocol.java | 14 ++-
4 files changed, 198 insertions(+), 12 deletions(-)
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2FrameCodec.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2FrameCodec.java
index 8a7d13e713..2a8ad9a087 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2FrameCodec.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2FrameCodec.java
@@ -16,6 +16,8 @@
*/
package org.apache.dubbo.remoting.http12.netty4.h2;
+import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
+import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.remoting.http12.h2.Http2Header;
import org.apache.dubbo.remoting.http12.h2.Http2InputMessage;
import org.apache.dubbo.remoting.http12.h2.Http2InputMessageFrame;
@@ -25,6 +27,9 @@ import
org.apache.dubbo.remoting.http12.message.DefaultHttpHeaders;
import org.apache.dubbo.remoting.http12.netty4.NettyHttpHeaders;
import java.io.OutputStream;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
@@ -37,9 +42,32 @@ import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame;
import io.netty.handler.codec.http2.Http2DataFrame;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2HeadersFrame;
+import io.netty.util.concurrent.ScheduledFuture;
+
+import static
org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_TIMEOUT_SERVER;
public class NettyHttp2FrameCodec extends ChannelDuplexHandler {
+ private static final ErrorTypeAwareLogger LOGGER =
+ LoggerFactory.getErrorTypeAwareLogger(NettyHttp2FrameCodec.class);
+
+ private static final long SETTINGS_FRAME_ARRIVAL_TIMEOUT = 3;
+
+ private final NettyHttp2SettingsHandler nettyHttp2SettingsHandler;
+
+ private final List<CachedMsg> cachedMsgList = new LinkedList<>();
+
+ private boolean settingsFrameArrived;
+
+ private ScheduledFuture<?> settingsFrameArrivalTimeoutFuture;
+
+ public NettyHttp2FrameCodec(NettyHttp2SettingsHandler
nettyHttp2SettingsHandler) {
+ this.nettyHttp2SettingsHandler = nettyHttp2SettingsHandler;
+ if (!nettyHttp2SettingsHandler.subscribeSettingsFrameArrival(this)) {
+ settingsFrameArrived = true;
+ }
+ }
+
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws
Exception {
if (msg instanceof Http2HeadersFrame) {
@@ -53,15 +81,76 @@ public class NettyHttp2FrameCodec extends
ChannelDuplexHandler {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise
promise) throws Exception {
- if (msg instanceof Http2Header) {
- super.write(ctx, encodeHttp2HeadersFrame((Http2Header) msg),
promise);
- } else if (msg instanceof Http2OutputMessage) {
- super.write(ctx, encodeHttp2DataFrame((Http2OutputMessage) msg),
promise);
- } else {
- super.write(ctx, msg, promise);
+ if (settingsFrameArrived) {
+ if (msg instanceof Http2Header) {
+ super.write(ctx, encodeHttp2HeadersFrame((Http2Header) msg),
promise);
+ } else if (msg instanceof Http2OutputMessage) {
+ super.write(ctx, encodeHttp2DataFrame((Http2OutputMessage)
msg), promise);
+ } else {
+ super.write(ctx, msg, promise);
+ }
+ return;
+ }
+
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Cache writing msg before client connection preface
arrival: {}", msg);
+ }
+ cachedMsgList.add(new CachedMsg(ctx, msg, promise));
+
+ if (settingsFrameArrivalTimeoutFuture == null) {
+ // close ctx and release resources if client connection preface
does not arrive in time.
+ settingsFrameArrivalTimeoutFuture = ctx.executor()
+ .schedule(
+ () -> {
+ LOGGER.error(
+ PROTOCOL_TIMEOUT_SERVER,
+ "",
+ "",
+ "client connection preface does not
arrive in time.");
+ // send RST_STREAM instead of GO_AWAY by
calling close method to avoid client hanging.
+ ctx.close();
+
nettyHttp2SettingsHandler.unsubscribeSettingsFrameArrival(this);
+ cachedMsgList.clear();
+ },
+ SETTINGS_FRAME_ARRIVAL_TIMEOUT,
+ TimeUnit.SECONDS);
}
}
+ public void notifySettingsFrameArrival() throws Exception {
+ if (settingsFrameArrived) {
+ return;
+ }
+ settingsFrameArrived = true;
+
+ if (settingsFrameArrivalTimeoutFuture != null) {
+ settingsFrameArrivalTimeoutFuture.cancel(false);
+ }
+
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Begin cached channel msg writing when client
connection preface arrived.");
+ }
+
+ for (CachedMsg cached : cachedMsgList) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Cached channel msg writing, ctx: {} msg: {}",
cached.ctx, cached.msg);
+ }
+ if (cached.msg instanceof Http2Header) {
+ super.write(cached.ctx, encodeHttp2HeadersFrame(((Http2Header)
cached.msg)), cached.promise);
+ } else if (cached.msg instanceof Http2OutputMessage) {
+ super.write(cached.ctx,
encodeHttp2DataFrame(((Http2OutputMessage) cached.msg)), cached.promise);
+ } else {
+ super.write(cached.ctx, cached.msg, cached.promise);
+ }
+ }
+
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("End cached channel msg writing.");
+ }
+
+ cachedMsgList.clear();
+ }
+
private Http2Header onHttp2HeadersFrame(Http2HeadersFrame headersFrame) {
return new Http2MetadataFrame(
headersFrame.stream().id(), new
DefaultHttpHeaders(headersFrame.headers()), headersFrame.isEndStream());
@@ -89,4 +178,16 @@ public class NettyHttp2FrameCodec extends
ChannelDuplexHandler {
}
throw new IllegalArgumentException("Http2OutputMessage body must be
ByteBufOutputStream");
}
+
+ private static class CachedMsg {
+ private final ChannelHandlerContext ctx;
+ private final Object msg;
+ private final ChannelPromise promise;
+
+ public CachedMsg(ChannelHandlerContext ctx, Object msg, ChannelPromise
promise) {
+ this.ctx = ctx;
+ this.msg = msg;
+ this.promise = promise;
+ }
+ }
}
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2SettingsHandler.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2SettingsHandler.java
new file mode 100644
index 0000000000..bfe885732c
--- /dev/null
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2SettingsHandler.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.http12.netty4.h2;
+
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.http2.Http2SettingsFrame;
+
+/**
+ * Add NettyHttp2SettingsHandler to pipeline because NettyHttp2FrameCodec
could not receive Http2SettingsFrame.
+ * Http2SettingsFrame does not belong to Http2StreamFrame or Http2GoAwayFrame
that Http2MultiplexHandler
+ * could process, NettyHttp2FrameCodec is wrapped in Http2MultiplexHandler as
a child handler.
+ */
+public class NettyHttp2SettingsHandler extends
SimpleChannelInboundHandler<Http2SettingsFrame> {
+
+ private static final Logger logger =
LoggerFactory.getLogger(NettyHttp2SettingsHandler.class);
+
+ /**
+ * Http2SettingsFrame arrival notification subscribers.
+ */
+ private final Set<NettyHttp2FrameCodec> settingsFrameArrivalSubscribers =
new HashSet<>();
+
+ private boolean settingsFrameArrived;
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, Http2SettingsFrame
msg) throws Exception {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Receive client Http2 Settings frame of "
+ + ctx.channel().localAddress() + " <- " +
ctx.channel().remoteAddress());
+ }
+ settingsFrameArrived = true;
+
+ // Notify all subscribers that Http2SettingsFrame is arrived.
+ for (NettyHttp2FrameCodec nettyHttp2FrameCodec :
settingsFrameArrivalSubscribers) {
+ nettyHttp2FrameCodec.notifySettingsFrameArrival();
+ }
+ settingsFrameArrivalSubscribers.clear();
+
+ ctx.pipeline().remove(this);
+ }
+
+ /**
+ * Save Http2SettingsFrame arrival notification subscriber if
Http2SettingsFrame is not arrived.
+ * @param nettyHttp2FrameCodec the netty HTTP2 frame codec that will be
notified.
+ * @return true: subscribe successful, false: Http2SettingsFrame arrived.
+ */
+ public boolean subscribeSettingsFrameArrival(NettyHttp2FrameCodec
nettyHttp2FrameCodec) {
+ if (!settingsFrameArrived) {
+ settingsFrameArrivalSubscribers.add(nettyHttp2FrameCodec);
+ return true;
+ }
+ return false;
+ }
+
+ public void unsubscribeSettingsFrameArrival(NettyHttp2FrameCodec
nettyHttp2FrameCodec) {
+ settingsFrameArrivalSubscribers.remove(nettyHttp2FrameCodec);
+ }
+}
diff --git
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/http2/Http2ClientSettingsHandler.java
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/http2/Http2ClientSettingsHandler.java
index 35eb7bbb58..9e1fbffce7 100644
---
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/http2/Http2ClientSettingsHandler.java
+++
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/http2/Http2ClientSettingsHandler.java
@@ -39,8 +39,8 @@ public class Http2ClientSettingsHandler extends
SimpleChannelInboundHandler<Http
@Override
protected void channelRead0(ChannelHandlerContext ctx, Http2SettingsFrame
msg) throws Exception {
if (logger.isDebugEnabled()) {
- logger.debug("Receive Http2 Settings frame of " +
ctx.channel().localAddress() + " -> "
- + ctx.channel().remoteAddress());
+ logger.debug("Receive server Http2 Settings frame of "
+ + ctx.channel().localAddress() + " -> " +
ctx.channel().remoteAddress());
}
// connectionPrefaceReceivedPromise will be set null after first used.
Promise<Void> connectionPrefaceReceivedPromise =
connectionPrefaceReceivedPromiseRef.get();
@@ -49,6 +49,7 @@ public class Http2ClientSettingsHandler extends
SimpleChannelInboundHandler<Http
} else {
// Notify the connection preface is received when first inbound
http2 settings frame is arrived.
connectionPrefaceReceivedPromise.trySuccess(null);
+ ctx.pipeline().remove(this);
}
}
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
index bbd53f4ab1..eac0646b3a 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
@@ -31,6 +31,7 @@ import
org.apache.dubbo.remoting.http12.netty4.h1.NettyHttp1Codec;
import org.apache.dubbo.remoting.http12.netty4.h1.NettyHttp1ConnectionHandler;
import org.apache.dubbo.remoting.http12.netty4.h2.NettyHttp2FrameCodec;
import
org.apache.dubbo.remoting.http12.netty4.h2.NettyHttp2ProtocolSelectorHandler;
+import org.apache.dubbo.remoting.http12.netty4.h2.NettyHttp2SettingsHandler;
import org.apache.dubbo.remoting.utils.UrlUtils;
import org.apache.dubbo.remoting.websocket.netty4.WebSocketFrameCodec;
import
org.apache.dubbo.remoting.websocket.netty4.WebSocketProtocolSelectorHandler;
@@ -155,12 +156,14 @@ public class TripleHttp2Protocol extends
AbstractWireProtocol implements ScopeMo
sourceCodec,
protocol -> {
if
(AsciiString.contentEquals(Http2CodecUtil.HTTP_UPGRADE_PROTOCOL_NAME,
protocol)) {
+ NettyHttp2SettingsHandler nettyHttp2SettingsHandler =
new NettyHttp2SettingsHandler();
return new Http2ServerUpgradeCodec(
buildHttp2FrameCodec(tripleConfig),
+ nettyHttp2SettingsHandler,
new HttpWriteQueueHandler(),
new FlushConsolidationHandler(64, true),
new TripleServerConnectionHandler(),
- buildHttp2MultiplexHandler(url, tripleConfig),
+
buildHttp2MultiplexHandler(nettyHttp2SettingsHandler, url, tripleConfig),
new TripleTailHandler());
} else if
(AsciiString.contentEquals(HttpHeaderValues.WEBSOCKET, protocol)) {
return new WebSocketServerUpgradeCodec(
@@ -191,12 +194,13 @@ public class TripleHttp2Protocol extends
AbstractWireProtocol implements ScopeMo
url, frameworkModel, tripleConfig,
DefaultHttp11ServerTransportListenerFactory.INSTANCE)));
}
- private Http2MultiplexHandler buildHttp2MultiplexHandler(URL url,
TripleConfig tripleConfig) {
+ private Http2MultiplexHandler buildHttp2MultiplexHandler(
+ NettyHttp2SettingsHandler nettyHttp2SettingsHandler, URL url,
TripleConfig tripleConfig) {
return new Http2MultiplexHandler(new
ChannelInitializer<Http2StreamChannel>() {
@Override
protected void initChannel(Http2StreamChannel ch) {
ChannelPipeline p = ch.pipeline();
- p.addLast(new NettyHttp2FrameCodec());
+ p.addLast(new NettyHttp2FrameCodec(nettyHttp2SettingsHandler));
p.addLast(new NettyHttp2ProtocolSelectorHandler(
url, frameworkModel, tripleConfig,
GenericHttp2ServerTransportListenerFactory.INSTANCE));
}
@@ -204,11 +208,13 @@ public class TripleHttp2Protocol extends
AbstractWireProtocol implements ScopeMo
}
private void configurerHttp2Handlers(URL url, List<ChannelHandler>
handlers) {
+ NettyHttp2SettingsHandler nettyHttp2SettingsHandler = new
NettyHttp2SettingsHandler();
TripleConfig tripleConfig =
ConfigManager.getProtocolOrDefault(url).getTripleOrDefault();
Http2FrameCodec codec = buildHttp2FrameCodec(tripleConfig);
- Http2MultiplexHandler handler = buildHttp2MultiplexHandler(url,
tripleConfig);
+ Http2MultiplexHandler handler =
buildHttp2MultiplexHandler(nettyHttp2SettingsHandler, url, tripleConfig);
handlers.add(new ChannelHandlerPretender(new HttpWriteQueueHandler()));
handlers.add(new ChannelHandlerPretender(codec));
+ handlers.add(new ChannelHandlerPretender(nettyHttp2SettingsHandler));
handlers.add(new ChannelHandlerPretender(new
FlushConsolidationHandler(64, true)));
handlers.add(new ChannelHandlerPretender(new
TripleServerConnectionHandler()));
handlers.add(new ChannelHandlerPretender(handler));