This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new f5569dc4a16 [improve](fe) Support to config max msg/frame size of the 
thrift server (#36015)
f5569dc4a16 is described below

commit f5569dc4a165ef61d3ea86c5dd9ae382b31b758f
Author: walter <w41te...@gmail.com>
AuthorDate: Sat Jun 8 23:49:14 2024 +0800

    [improve](fe) Support to config max msg/frame size of the thrift server 
(#36015)
    
    Cherry-pick #35845
---
 .../main/java/org/apache/doris/common/Config.java  | 10 +++++
 .../java/org/apache/doris/common/ThriftServer.java | 52 +++++++++++++++++++---
 2 files changed, 55 insertions(+), 7 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index c53f7e82350..c219a6f9656 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -395,6 +395,16 @@ public class Config extends ConfigBase {
             "The connection timeout of thrift client, in milliseconds. 0 means 
no timeout."})
     public static int thrift_client_timeout_ms = 0;
 
+    // The default value is inherited from org.apache.thrift.TConfiguration
+    @ConfField(description = {"thrift server 接收请求大小的上限",
+            "The maximum size of a (received) message of the thrift server, in 
bytes"})
+    public static int thrift_max_message_size = 100 * 1024 * 1024;
+
+    // The default value is inherited from org.apache.thrift.TConfiguration
+    @ConfField(description = {"thrift server transport 接收的每帧数据大小的上限",
+            "The limits of the size of one frame of thrift server transport"})
+    public static int thrift_max_frame_size = 16384000;
+
     @ConfField(description = {"thrift server 的 backlog 数量。"
             + "如果调大这个值,则需同时调整 /proc/sys/net/core/somaxconn 的值",
             "The backlog number of thrift server. "
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ThriftServer.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/ThriftServer.java
index 45c4bcb64f0..259d1a25870 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/ThriftServer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/ThriftServer.java
@@ -23,6 +23,7 @@ import org.apache.doris.thrift.TNetworkAddress;
 import com.google.common.collect.Sets;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.apache.thrift.TConfiguration;
 import org.apache.thrift.TProcessor;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.server.TServer;
@@ -31,10 +32,13 @@ import org.apache.thrift.server.TThreadPoolServer;
 import org.apache.thrift.server.TThreadedSelectorServer;
 import org.apache.thrift.transport.TNonblockingServerSocket;
 import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransportException;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
 import java.util.Set;
 import java.util.concurrent.ThreadPoolExecutor;
 
@@ -111,18 +115,18 @@ public class ThriftServer {
 
         if (FrontendOptions.isBindIPV6()) {
             socketTransportArgs = new TServerSocket.ServerSocketTransportArgs()
-                .bindAddr(new InetSocketAddress("::0", port))
-                .clientTimeout(Config.thrift_client_timeout_ms)
-                .backlog(Config.thrift_backlog_num);
+                    .bindAddr(new InetSocketAddress("::0", port))
+                    .clientTimeout(Config.thrift_client_timeout_ms)
+                    .backlog(Config.thrift_backlog_num);
         } else {
             socketTransportArgs = new TServerSocket.ServerSocketTransportArgs()
-                .bindAddr(new InetSocketAddress("0.0.0.0", port))
-                .clientTimeout(Config.thrift_client_timeout_ms)
-                .backlog(Config.thrift_backlog_num);
+                    .bindAddr(new InetSocketAddress("0.0.0.0", port))
+                    .clientTimeout(Config.thrift_client_timeout_ms)
+                    .backlog(Config.thrift_backlog_num);
         }
 
         TThreadPoolServer.Args serverArgs =
-                new TThreadPoolServer.Args(new 
TServerSocket(socketTransportArgs)).protocolFactory(
+                new TThreadPoolServer.Args(new 
ImprovedTServerSocket(socketTransportArgs)).protocolFactory(
                         new 
TBinaryProtocol.Factory(Config.fe_thrift_max_pkg_bytes, 
-1)).processor(processor);
         ThreadPoolExecutor threadPoolExecutor = 
ThreadPoolManager.newDaemonCacheThreadPool(
                 Config.thrift_server_max_worker_threads, "thrift-server-pool", 
true);
@@ -175,4 +179,38 @@ public class ThriftServer {
     public void removeConnect(TNetworkAddress clientAddress) {
         connects.remove(clientAddress);
     }
+
+    static class ImprovedTServerSocket extends TServerSocket {
+        public ImprovedTServerSocket(ServerSocketTransportArgs args) throws 
TTransportException {
+            super(args);
+        }
+
+        public TSocket accept() throws TTransportException {
+            ServerSocket serverSocket = getServerSocket();
+            if (serverSocket == null) {
+                throw new TTransportException(TTransportException.NOT_OPEN, 
"No underlying server socket.");
+            }
+
+            Socket result;
+            try {
+                result = serverSocket.accept();
+            } catch (Exception e) {
+                throw new TTransportException(e);
+            }
+            if (result == null) {
+                throw new TTransportException("Blocking server's accept() may 
not return NULL");
+            }
+
+            TSocket socket = new TSocket(result);
+
+            TConfiguration cfg = socket.getConfiguration();
+            cfg.setMaxMessageSize(Config.thrift_max_message_size);
+            cfg.setMaxFrameSize(Config.thrift_max_frame_size);
+
+            socket.updateKnownMessageSize(0); // Since we update the 
configuration, reset consumed message size.
+            socket.setTimeout(Config.thrift_client_timeout_ms);
+
+            return socket;
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to