This is an automated email from the ASF dual-hosted git repository.

zrlw pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new d079dec009 [3.3] Add HTTP/2 server connection preface process (#15535)
d079dec009 is described below

commit d079dec0091eb748d61851de2e957709a0b7853d
Author: zrlw <[email protected]>
AuthorDate: Sun Jul 20 11:04:00 2025 +0800

    [3.3] Add HTTP/2 server connection preface process (#15535)
    
    * Add HTTP/2 server connection preface process
    
    * Close channel handler context and release resources if client connection 
preface does not arrive in time
---
 .../http12/netty4/h2/NettyHttp2FrameCodec.java     | 113 +++++++++++++++++++--
 .../netty4/h2/NettyHttp2SettingsHandler.java       |  78 ++++++++++++++
 .../netty4/http2/Http2ClientSettingsHandler.java   |   5 +-
 .../rpc/protocol/tri/TripleHttp2Protocol.java      |  14 ++-
 4 files changed, 198 insertions(+), 12 deletions(-)

diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2FrameCodec.java
 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2FrameCodec.java
index 8a7d13e713..2a8ad9a087 100644
--- 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2FrameCodec.java
+++ 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2FrameCodec.java
@@ -16,6 +16,8 @@
  */
 package org.apache.dubbo.remoting.http12.netty4.h2;
 
+import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
+import org.apache.dubbo.common.logger.LoggerFactory;
 import org.apache.dubbo.remoting.http12.h2.Http2Header;
 import org.apache.dubbo.remoting.http12.h2.Http2InputMessage;
 import org.apache.dubbo.remoting.http12.h2.Http2InputMessageFrame;
@@ -25,6 +27,9 @@ import 
org.apache.dubbo.remoting.http12.message.DefaultHttpHeaders;
 import org.apache.dubbo.remoting.http12.netty4.NettyHttpHeaders;
 
 import java.io.OutputStream;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufInputStream;
@@ -37,9 +42,32 @@ import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame;
 import io.netty.handler.codec.http2.Http2DataFrame;
 import io.netty.handler.codec.http2.Http2Headers;
 import io.netty.handler.codec.http2.Http2HeadersFrame;
+import io.netty.util.concurrent.ScheduledFuture;
+
+import static 
org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_TIMEOUT_SERVER;
 
 public class NettyHttp2FrameCodec extends ChannelDuplexHandler {
 
+    private static final ErrorTypeAwareLogger LOGGER =
+            LoggerFactory.getErrorTypeAwareLogger(NettyHttp2FrameCodec.class);
+
+    private static final long SETTINGS_FRAME_ARRIVAL_TIMEOUT = 3;
+
+    private final NettyHttp2SettingsHandler nettyHttp2SettingsHandler;
+
+    private final List<CachedMsg> cachedMsgList = new LinkedList<>();
+
+    private boolean settingsFrameArrived;
+
+    private ScheduledFuture<?> settingsFrameArrivalTimeoutFuture;
+
+    public NettyHttp2FrameCodec(NettyHttp2SettingsHandler 
nettyHttp2SettingsHandler) {
+        this.nettyHttp2SettingsHandler = nettyHttp2SettingsHandler;
+        if (!nettyHttp2SettingsHandler.subscribeSettingsFrameArrival(this)) {
+            settingsFrameArrived = true;
+        }
+    }
+
     @Override
     public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
         if (msg instanceof Http2HeadersFrame) {
@@ -53,15 +81,76 @@ public class NettyHttp2FrameCodec extends 
ChannelDuplexHandler {
 
     @Override
     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise 
promise) throws Exception {
-        if (msg instanceof Http2Header) {
-            super.write(ctx, encodeHttp2HeadersFrame((Http2Header) msg), 
promise);
-        } else if (msg instanceof Http2OutputMessage) {
-            super.write(ctx, encodeHttp2DataFrame((Http2OutputMessage) msg), 
promise);
-        } else {
-            super.write(ctx, msg, promise);
+        if (settingsFrameArrived) {
+            if (msg instanceof Http2Header) {
+                super.write(ctx, encodeHttp2HeadersFrame((Http2Header) msg), 
promise);
+            } else if (msg instanceof Http2OutputMessage) {
+                super.write(ctx, encodeHttp2DataFrame((Http2OutputMessage) 
msg), promise);
+            } else {
+                super.write(ctx, msg, promise);
+            }
+            return;
+        }
+
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Cache writing msg before client connection preface 
arrival: {}", msg);
+        }
+        cachedMsgList.add(new CachedMsg(ctx, msg, promise));
+
+        if (settingsFrameArrivalTimeoutFuture == null) {
+            // close ctx and release resources if client connection preface 
does not arrive in time.
+            settingsFrameArrivalTimeoutFuture = ctx.executor()
+                    .schedule(
+                            () -> {
+                                LOGGER.error(
+                                        PROTOCOL_TIMEOUT_SERVER,
+                                        "",
+                                        "",
+                                        "client connection preface does not 
arrive in time.");
+                                // send RST_STREAM instead of GO_AWAY by 
calling close method to avoid client hanging.
+                                ctx.close();
+                                
nettyHttp2SettingsHandler.unsubscribeSettingsFrameArrival(this);
+                                cachedMsgList.clear();
+                            },
+                            SETTINGS_FRAME_ARRIVAL_TIMEOUT,
+                            TimeUnit.SECONDS);
         }
     }
 
+    public void notifySettingsFrameArrival() throws Exception {
+        if (settingsFrameArrived) {
+            return;
+        }
+        settingsFrameArrived = true;
+
+        if (settingsFrameArrivalTimeoutFuture != null) {
+            settingsFrameArrivalTimeoutFuture.cancel(false);
+        }
+
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Begin cached channel msg writing when client 
connection preface arrived.");
+        }
+
+        for (CachedMsg cached : cachedMsgList) {
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug("Cached channel msg writing, ctx: {} msg: {}", 
cached.ctx, cached.msg);
+            }
+            if (cached.msg instanceof Http2Header) {
+                super.write(cached.ctx, encodeHttp2HeadersFrame(((Http2Header) 
cached.msg)), cached.promise);
+            } else if (cached.msg instanceof Http2OutputMessage) {
+                super.write(cached.ctx, 
encodeHttp2DataFrame(((Http2OutputMessage) cached.msg)), cached.promise);
+            } else {
+                super.write(cached.ctx, cached.msg, cached.promise);
+            }
+        }
+
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("End cached channel msg writing.");
+        }
+
+        cachedMsgList.clear();
+    }
+
     private Http2Header onHttp2HeadersFrame(Http2HeadersFrame headersFrame) {
         return new Http2MetadataFrame(
                 headersFrame.stream().id(), new 
DefaultHttpHeaders(headersFrame.headers()), headersFrame.isEndStream());
@@ -89,4 +178,16 @@ public class NettyHttp2FrameCodec extends 
ChannelDuplexHandler {
         }
         throw new IllegalArgumentException("Http2OutputMessage body must be 
ByteBufOutputStream");
     }
+
+    private static class CachedMsg {
+        private final ChannelHandlerContext ctx;
+        private final Object msg;
+        private final ChannelPromise promise;
+
+        public CachedMsg(ChannelHandlerContext ctx, Object msg, ChannelPromise 
promise) {
+            this.ctx = ctx;
+            this.msg = msg;
+            this.promise = promise;
+        }
+    }
 }
diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2SettingsHandler.java
 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2SettingsHandler.java
new file mode 100644
index 0000000000..bfe885732c
--- /dev/null
+++ 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2SettingsHandler.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.http12.netty4.h2;
+
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.http2.Http2SettingsFrame;
+
+/**
+ * Add NettyHttp2SettingsHandler to pipeline because NettyHttp2FrameCodec 
could not receive Http2SettingsFrame.
+ * Http2SettingsFrame does not belong to Http2StreamFrame or Http2GoAwayFrame 
that Http2MultiplexHandler
+ * could process, NettyHttp2FrameCodec is wrapped in Http2MultiplexHandler as 
a child handler.
+ */
+public class NettyHttp2SettingsHandler extends 
SimpleChannelInboundHandler<Http2SettingsFrame> {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(NettyHttp2SettingsHandler.class);
+
+    /**
+     * Http2SettingsFrame arrival notification subscribers.
+     */
+    private final Set<NettyHttp2FrameCodec> settingsFrameArrivalSubscribers = 
new HashSet<>();
+
+    private boolean settingsFrameArrived;
+
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, Http2SettingsFrame 
msg) throws Exception {
+        if (logger.isDebugEnabled()) {
+            logger.debug("Receive client Http2 Settings frame of "
+                    + ctx.channel().localAddress() + " <- " + 
ctx.channel().remoteAddress());
+        }
+        settingsFrameArrived = true;
+
+        // Notify all subscribers that Http2SettingsFrame is arrived.
+        for (NettyHttp2FrameCodec nettyHttp2FrameCodec : 
settingsFrameArrivalSubscribers) {
+            nettyHttp2FrameCodec.notifySettingsFrameArrival();
+        }
+        settingsFrameArrivalSubscribers.clear();
+
+        ctx.pipeline().remove(this);
+    }
+
+    /**
+     * Save Http2SettingsFrame arrival notification subscriber if 
Http2SettingsFrame is not arrived.
+     * @param nettyHttp2FrameCodec the netty HTTP2 frame codec that will be 
notified.
+     * @return true: subscribe successful, false: Http2SettingsFrame arrived.
+     */
+    public boolean subscribeSettingsFrameArrival(NettyHttp2FrameCodec 
nettyHttp2FrameCodec) {
+        if (!settingsFrameArrived) {
+            settingsFrameArrivalSubscribers.add(nettyHttp2FrameCodec);
+            return true;
+        }
+        return false;
+    }
+
+    public void unsubscribeSettingsFrameArrival(NettyHttp2FrameCodec 
nettyHttp2FrameCodec) {
+        settingsFrameArrivalSubscribers.remove(nettyHttp2FrameCodec);
+    }
+}
diff --git 
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/http2/Http2ClientSettingsHandler.java
 
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/http2/Http2ClientSettingsHandler.java
index 35eb7bbb58..9e1fbffce7 100644
--- 
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/http2/Http2ClientSettingsHandler.java
+++ 
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/http2/Http2ClientSettingsHandler.java
@@ -39,8 +39,8 @@ public class Http2ClientSettingsHandler extends 
SimpleChannelInboundHandler<Http
     @Override
     protected void channelRead0(ChannelHandlerContext ctx, Http2SettingsFrame 
msg) throws Exception {
         if (logger.isDebugEnabled()) {
-            logger.debug("Receive Http2 Settings frame of " + 
ctx.channel().localAddress() + " -> "
-                    + ctx.channel().remoteAddress());
+            logger.debug("Receive server Http2 Settings frame of "
+                    + ctx.channel().localAddress() + " -> " + 
ctx.channel().remoteAddress());
         }
         // connectionPrefaceReceivedPromise will be set null after first used.
         Promise<Void> connectionPrefaceReceivedPromise = 
connectionPrefaceReceivedPromiseRef.get();
@@ -49,6 +49,7 @@ public class Http2ClientSettingsHandler extends 
SimpleChannelInboundHandler<Http
         } else {
             // Notify the connection preface is received when first inbound 
http2 settings frame is arrived.
             connectionPrefaceReceivedPromise.trySuccess(null);
+            ctx.pipeline().remove(this);
         }
     }
 }
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
index bbd53f4ab1..eac0646b3a 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
@@ -31,6 +31,7 @@ import 
org.apache.dubbo.remoting.http12.netty4.h1.NettyHttp1Codec;
 import org.apache.dubbo.remoting.http12.netty4.h1.NettyHttp1ConnectionHandler;
 import org.apache.dubbo.remoting.http12.netty4.h2.NettyHttp2FrameCodec;
 import 
org.apache.dubbo.remoting.http12.netty4.h2.NettyHttp2ProtocolSelectorHandler;
+import org.apache.dubbo.remoting.http12.netty4.h2.NettyHttp2SettingsHandler;
 import org.apache.dubbo.remoting.utils.UrlUtils;
 import org.apache.dubbo.remoting.websocket.netty4.WebSocketFrameCodec;
 import 
org.apache.dubbo.remoting.websocket.netty4.WebSocketProtocolSelectorHandler;
@@ -155,12 +156,14 @@ public class TripleHttp2Protocol extends 
AbstractWireProtocol implements ScopeMo
                 sourceCodec,
                 protocol -> {
                     if 
(AsciiString.contentEquals(Http2CodecUtil.HTTP_UPGRADE_PROTOCOL_NAME, 
protocol)) {
+                        NettyHttp2SettingsHandler nettyHttp2SettingsHandler = 
new NettyHttp2SettingsHandler();
                         return new Http2ServerUpgradeCodec(
                                 buildHttp2FrameCodec(tripleConfig),
+                                nettyHttp2SettingsHandler,
                                 new HttpWriteQueueHandler(),
                                 new FlushConsolidationHandler(64, true),
                                 new TripleServerConnectionHandler(),
-                                buildHttp2MultiplexHandler(url, tripleConfig),
+                                
buildHttp2MultiplexHandler(nettyHttp2SettingsHandler, url, tripleConfig),
                                 new TripleTailHandler());
                     } else if 
(AsciiString.contentEquals(HttpHeaderValues.WEBSOCKET, protocol)) {
                         return new WebSocketServerUpgradeCodec(
@@ -191,12 +194,13 @@ public class TripleHttp2Protocol extends 
AbstractWireProtocol implements ScopeMo
                 url, frameworkModel, tripleConfig, 
DefaultHttp11ServerTransportListenerFactory.INSTANCE)));
     }
 
-    private Http2MultiplexHandler buildHttp2MultiplexHandler(URL url, 
TripleConfig tripleConfig) {
+    private Http2MultiplexHandler buildHttp2MultiplexHandler(
+            NettyHttp2SettingsHandler nettyHttp2SettingsHandler, URL url, 
TripleConfig tripleConfig) {
         return new Http2MultiplexHandler(new 
ChannelInitializer<Http2StreamChannel>() {
             @Override
             protected void initChannel(Http2StreamChannel ch) {
                 ChannelPipeline p = ch.pipeline();
-                p.addLast(new NettyHttp2FrameCodec());
+                p.addLast(new NettyHttp2FrameCodec(nettyHttp2SettingsHandler));
                 p.addLast(new NettyHttp2ProtocolSelectorHandler(
                         url, frameworkModel, tripleConfig, 
GenericHttp2ServerTransportListenerFactory.INSTANCE));
             }
@@ -204,11 +208,13 @@ public class TripleHttp2Protocol extends 
AbstractWireProtocol implements ScopeMo
     }
 
     private void configurerHttp2Handlers(URL url, List<ChannelHandler> 
handlers) {
+        NettyHttp2SettingsHandler nettyHttp2SettingsHandler = new 
NettyHttp2SettingsHandler();
         TripleConfig tripleConfig = 
ConfigManager.getProtocolOrDefault(url).getTripleOrDefault();
         Http2FrameCodec codec = buildHttp2FrameCodec(tripleConfig);
-        Http2MultiplexHandler handler = buildHttp2MultiplexHandler(url, 
tripleConfig);
+        Http2MultiplexHandler handler = 
buildHttp2MultiplexHandler(nettyHttp2SettingsHandler, url, tripleConfig);
         handlers.add(new ChannelHandlerPretender(new HttpWriteQueueHandler()));
         handlers.add(new ChannelHandlerPretender(codec));
+        handlers.add(new ChannelHandlerPretender(nettyHttp2SettingsHandler));
         handlers.add(new ChannelHandlerPretender(new 
FlushConsolidationHandler(64, true)));
         handlers.add(new ChannelHandlerPretender(new 
TripleServerConnectionHandler()));
         handlers.add(new ChannelHandlerPretender(handler));

Reply via email to