This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 2e4201e4066fec38deb22494248473fd3dbbc9e3
Author: zhiqiang <seuhezhiqi...@163.com>
AuthorDate: Wed Mar 6 14:50:27 2024 +0800

    [opt](session variable) max_msg_size_of_result_receiver #31809
---
 .../main/java/org/apache/doris/qe/Coordinator.java |  3 +-
 .../java/org/apache/doris/qe/PointQueryExec.java   | 19 +++++--
 .../java/org/apache/doris/qe/ResultReceiver.java   | 22 ++++++--
 .../java/org/apache/doris/qe/SessionVariable.java  | 14 +++++
 .../java/org/apache/doris/qe/StmtExecutor.java     |  3 +-
 .../apache/doris/rpc/TCustomProtocolFactory.java   | 41 +++++++++++++++
 .../max_msg_size_of_result_receiver.groovy         | 61 ++++++++++++++++++++++
 7 files changed, 155 insertions(+), 8 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 3ab3d40dc7b..b91fcd15e3c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -659,7 +659,8 @@ public class Coordinator implements CoordInterface {
         if (topDataSink instanceof ResultSink || topDataSink instanceof 
ResultFileSink) {
             TNetworkAddress execBeAddr = 
topParams.instanceExecParams.get(0).host;
             receiver = new ResultReceiver(queryId, 
topParams.instanceExecParams.get(0).instanceId,
-                    addressToBackendID.get(execBeAddr), 
toBrpcHost(execBeAddr), this.timeoutDeadline);
+                    addressToBackendID.get(execBeAddr), 
toBrpcHost(execBeAddr), this.timeoutDeadline,
+                    
context.getSessionVariable().getMaxMsgSizeOfResultReceiver());
 
             if (!context.isReturnResultFromLocal()) {
                 
Preconditions.checkState(context.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java
index 40d244184b2..cf61d33b9a9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java
@@ -35,6 +35,7 @@ import org.apache.doris.proto.InternalService.KeyTuple;
 import org.apache.doris.proto.Types;
 import org.apache.doris.rpc.BackendServiceProxy;
 import org.apache.doris.rpc.RpcException;
+import org.apache.doris.rpc.TCustomProtocolFactory;
 import org.apache.doris.system.Backend;
 import org.apache.doris.thrift.TExpr;
 import org.apache.doris.thrift.TExprList;
@@ -85,6 +86,8 @@ public class PointQueryExec implements CoordInterface {
     // using this ID to find for this prepared statement
     private UUID cacheID;
 
+    private final int maxMsgSizeOfResultReceiver;
+
     private OlapScanNode getPlanRoot() {
         List<PlanFragment> fragments = planner.getFragments();
         PlanFragment fragment = fragments.get(0);
@@ -96,7 +99,7 @@ public class PointQueryExec implements CoordInterface {
         return planRoot;
     }
 
-    public PointQueryExec(Planner planner, Analyzer analyzer) {
+    public PointQueryExec(Planner planner, Analyzer analyzer, int 
maxMessageSize) {
         // init from planner
         this.planner = planner;
         List<PlanFragment> fragments = planner.getFragments();
@@ -117,6 +120,7 @@ public class PointQueryExec implements CoordInterface {
             // TODO
             // planner.getDescTable().toThrift();
         }
+        this.maxMsgSizeOfResultReceiver = maxMessageSize;
     }
 
     void setScanRangeLocations() throws Exception {
@@ -310,8 +314,17 @@ public class PointQueryExec implements CoordInterface {
         } else if (pResult.hasRowBatch() && pResult.getRowBatch().size() > 0) {
             byte[] serialResult = pResult.getRowBatch().toByteArray();
             TResultBatch resultBatch = new TResultBatch();
-            TDeserializer deserializer = new TDeserializer();
-            deserializer.deserialize(resultBatch, serialResult);
+            TDeserializer deserializer = new TDeserializer(
+                    new 
TCustomProtocolFactory(this.maxMsgSizeOfResultReceiver));
+            try {
+                deserializer.deserialize(resultBatch, serialResult);
+            } catch (TException e) {
+                if (e.getMessage().contains("MaxMessageSize reached")) {
+                    throw new TException("MaxMessageSize reached, try increase 
max_msg_size_of_result_receiver");
+                } else {
+                    throw e;
+                }
+            }
             rowBatch.setBatch(resultBatch);
             rowBatch.setEos(true);
             return rowBatch;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java
index a9e9740963f..c473d74b919 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java
@@ -23,6 +23,7 @@ import org.apache.doris.proto.InternalService;
 import org.apache.doris.proto.Types;
 import org.apache.doris.rpc.BackendServiceProxy;
 import org.apache.doris.rpc.RpcException;
+import org.apache.doris.rpc.TCustomProtocolFactory;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TResultBatch;
 import org.apache.doris.thrift.TStatusCode;
@@ -53,12 +54,16 @@ public class ResultReceiver {
     private Future<InternalService.PFetchDataResult> fetchDataAsyncFuture = 
null;
     public String cancelReason = "";
 
-    public ResultReceiver(TUniqueId queryId, TUniqueId tid, Long backendId, 
TNetworkAddress address, long timeoutTs) {
+    int maxMsgSizeOfResultReceiver;
+
+    public ResultReceiver(TUniqueId queryId, TUniqueId tid, Long backendId, 
TNetworkAddress address, long timeoutTs,
+            int maxMsgSizeOfResultReceiver) {
         this.queryId = 
Types.PUniqueId.newBuilder().setHi(queryId.hi).setLo(queryId.lo).build();
         this.finstId = 
Types.PUniqueId.newBuilder().setHi(tid.hi).setLo(tid.lo).build();
         this.backendId = backendId;
         this.address = address;
         this.timeoutTs = timeoutTs;
+        this.maxMsgSizeOfResultReceiver = maxMsgSizeOfResultReceiver;
     }
 
     public RowBatch getNext(Status status) throws TException {
@@ -136,8 +141,19 @@ public class ResultReceiver {
                 } else if (pResult.hasRowBatch() && 
pResult.getRowBatch().size() > 0) {
                     byte[] serialResult = pResult.getRowBatch().toByteArray();
                     TResultBatch resultBatch = new TResultBatch();
-                    TDeserializer deserializer = new TDeserializer();
-                    deserializer.deserialize(resultBatch, serialResult);
+                    TDeserializer deserializer = new TDeserializer(
+                            new 
TCustomProtocolFactory(this.maxMsgSizeOfResultReceiver));
+                    try {
+                        deserializer.deserialize(resultBatch, serialResult);
+                    } catch (TException e) {
+                        if (e.getMessage().contains("MaxMessageSize reached")) 
{
+                            throw new TException(
+                                "MaxMessageSize reached, try increase 
max_msg_size_of_result_receiver");
+                        } else {
+                            throw e;
+                        }
+                    }
+
                     rowBatch.setBatch(resultBatch);
                     rowBatch.setEos(pResult.getEos());
                     return rowBatch;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 1e137ad1192..164e06e0856 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -45,6 +45,7 @@ import com.google.common.collect.Lists;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.apache.thrift.TConfiguration;
 import org.json.simple.JSONObject;
 import org.json.simple.JSONValue;
 
@@ -515,6 +516,8 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String SHOW_ALL_FE_CONNECTION = 
"show_all_fe_connection";
 
+    public static final String MAX_MSG_SIZE_OF_RESULT_RECEIVER = 
"max_msg_size_of_result_receiver";
+
     public static final List<String> DEBUG_VARIABLES = ImmutableList.of(
             SKIP_DELETE_PREDICATE,
             SKIP_DELETE_BITMAP,
@@ -1636,6 +1639,13 @@ public class SessionVariable implements Serializable, 
Writable {
                     "当变量为true时,show processlist命令展示所有fe的连接"})
     public boolean showAllFeConnection = false;
 
+    @VariableMgr.VarAttr(name = MAX_MSG_SIZE_OF_RESULT_RECEIVER,
+            description = {"Max message size during result deserialization, 
change this if you meet error"
+                    + " like \"MaxMessageSize reached\"",
+                    "用于控制结果反序列化时 thrift 字段的最大值,当遇到类似\"MaxMessageSize 
reached\"这样的错误时可以考虑修改该参数"})
+    public int maxMsgSizeOfResultReceiver = 
TConfiguration.DEFAULT_MAX_MESSAGE_SIZE;
+
+
     // CLOUD_VARIABLES_BEGIN
     @VariableMgr.VarAttr(name = CLOUD_CLUSTER)
     public String cloudCluster = "";
@@ -3450,4 +3460,8 @@ public class SessionVariable implements Serializable, 
Writable {
         return this.showAllFeConnection;
     }
 
+    public int getMaxMsgSizeOfResultReceiver() {
+        return this.maxMsgSizeOfResultReceiver;
+    }
+
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 71af58481d3..808ec0de09d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -1569,7 +1569,8 @@ public class StmtExecutor {
         RowBatch batch;
         CoordInterface coordBase = null;
         if (queryStmt instanceof SelectStmt && ((SelectStmt) 
parsedStmt).isPointQueryShortCircuit()) {
-            coordBase = new PointQueryExec(planner, analyzer);
+            coordBase = new PointQueryExec(planner, analyzer,
+                    
context.getSessionVariable().getMaxMsgSizeOfResultReceiver());
         } else {
             coord = new Coordinator(context, analyzer, planner, 
context.getStatsErrorEstimator());
             QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/rpc/TCustomProtocolFactory.java 
b/fe/fe-core/src/main/java/org/apache/doris/rpc/TCustomProtocolFactory.java
new file mode 100644
index 00000000000..ca586df1e39
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/TCustomProtocolFactory.java
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.rpc;
+
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.transport.TTransport;
+
+/*
+*   CustomProtocolFactory can change maxMessageSize of TTransport,
+*   so that we can transfer message whose filed size is very large.
+*/
+public class TCustomProtocolFactory implements TProtocolFactory {
+    private final int maxMessageSize;
+
+    @Override
+    public TProtocol getProtocol(TTransport tTransport) {
+        tTransport.getConfiguration().setMaxMessageSize(maxMessageSize);
+        return new TBinaryProtocol(tTransport);
+    }
+
+    public TCustomProtocolFactory(int maxMessageSize) {
+        this.maxMessageSize = maxMessageSize;
+    }
+}
diff --git 
a/regression-test/suites/variable_p0/max_msg_size_of_result_receiver.groovy 
b/regression-test/suites/variable_p0/max_msg_size_of_result_receiver.groovy
new file mode 100644
index 00000000000..e7fead33d90
--- /dev/null
+++ b/regression-test/suites/variable_p0/max_msg_size_of_result_receiver.groovy
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("max_msg_size_of_result_receiver") {
+    def MESSAGE_SIZE_BASE=1000*1000; // 1MB
+    def DEFAULT_MAX_MESSAGE_SIZE = 104000000; // 104MB
+    def table_name = "max_msg_size_of_result_receiver"
+    sql """
+        DROP TABLE  IF EXISTS ${table_name}
+    """
+    sql """
+        CREATE TABLE IF NOT EXISTS ${table_name} (id int, str string)
+        ENGINE=OLAP DISTRIBUTED BY HASH(id)
+        PROPERTIES("replication_num"="1")
+    """
+
+    sql """
+        INSERT INTO ${table_name} VALUES (104, repeat("a", ${MESSAGE_SIZE_BASE 
* 104}))
+    """
+
+    sql """
+        INSERT INTO ${table_name} VALUES (105, repeat("a", ${MESSAGE_SIZE_BASE 
* 105}))
+    """
+
+    def with_exception = false
+    try {
+        sql "SELECT * FROM ${table_name} WHERE id = 104"
+    } catch (Exception e) {
+        with_exception = true
+    }
+    assertEquals(with_exception, false)
+    
+    try {
+        sql "SELECT * FROM ${table_name} WHERE id = 105"
+    } catch (Exception e) {
+        assertTrue(e.getMessage().contains('MaxMessageSize reached, try 
increase max_msg_size_of_result_receiver'))
+    }
+
+    try {
+        sql "SELECT 
/*+SET_VAR(max_msg_size_of_result_receiver=${DEFAULT_MAX_MESSAGE_SIZE * 2})*/ * 
FROM ${table_name} WHERE id = 105"
+    } catch (Exception e) {
+        with_exception = true
+    
+    }
+    assertEquals(with_exception, false)
+
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to