This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 2e4201e4066fec38deb22494248473fd3dbbc9e3 Author: zhiqiang <seuhezhiqi...@163.com> AuthorDate: Wed Mar 6 14:50:27 2024 +0800 [opt](session variable) max_msg_size_of_result_receiver #31809 --- .../main/java/org/apache/doris/qe/Coordinator.java | 3 +- .../java/org/apache/doris/qe/PointQueryExec.java | 19 +++++-- .../java/org/apache/doris/qe/ResultReceiver.java | 22 ++++++-- .../java/org/apache/doris/qe/SessionVariable.java | 14 +++++ .../java/org/apache/doris/qe/StmtExecutor.java | 3 +- .../apache/doris/rpc/TCustomProtocolFactory.java | 41 +++++++++++++++ .../max_msg_size_of_result_receiver.groovy | 61 ++++++++++++++++++++++ 7 files changed, 155 insertions(+), 8 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 3ab3d40dc7b..b91fcd15e3c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -659,7 +659,8 @@ public class Coordinator implements CoordInterface { if (topDataSink instanceof ResultSink || topDataSink instanceof ResultFileSink) { TNetworkAddress execBeAddr = topParams.instanceExecParams.get(0).host; receiver = new ResultReceiver(queryId, topParams.instanceExecParams.get(0).instanceId, - addressToBackendID.get(execBeAddr), toBrpcHost(execBeAddr), this.timeoutDeadline); + addressToBackendID.get(execBeAddr), toBrpcHost(execBeAddr), this.timeoutDeadline, + context.getSessionVariable().getMaxMsgSizeOfResultReceiver()); if (!context.isReturnResultFromLocal()) { Preconditions.checkState(context.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java index 40d244184b2..cf61d33b9a9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java @@ -35,6 +35,7 @@ import org.apache.doris.proto.InternalService.KeyTuple; import org.apache.doris.proto.Types; import org.apache.doris.rpc.BackendServiceProxy; import org.apache.doris.rpc.RpcException; +import org.apache.doris.rpc.TCustomProtocolFactory; import org.apache.doris.system.Backend; import org.apache.doris.thrift.TExpr; import org.apache.doris.thrift.TExprList; @@ -85,6 +86,8 @@ public class PointQueryExec implements CoordInterface { // using this ID to find for this prepared statement private UUID cacheID; + private final int maxMsgSizeOfResultReceiver; + private OlapScanNode getPlanRoot() { List<PlanFragment> fragments = planner.getFragments(); PlanFragment fragment = fragments.get(0); @@ -96,7 +99,7 @@ public class PointQueryExec implements CoordInterface { return planRoot; } - public PointQueryExec(Planner planner, Analyzer analyzer) { + public PointQueryExec(Planner planner, Analyzer analyzer, int maxMessageSize) { // init from planner this.planner = planner; List<PlanFragment> fragments = planner.getFragments(); @@ -117,6 +120,7 @@ public class PointQueryExec implements CoordInterface { // TODO // planner.getDescTable().toThrift(); } + this.maxMsgSizeOfResultReceiver = maxMessageSize; } void setScanRangeLocations() throws Exception { @@ -310,8 +314,17 @@ public class PointQueryExec implements CoordInterface { } else if (pResult.hasRowBatch() && pResult.getRowBatch().size() > 0) { byte[] serialResult = pResult.getRowBatch().toByteArray(); TResultBatch resultBatch = new TResultBatch(); - TDeserializer deserializer = new TDeserializer(); - deserializer.deserialize(resultBatch, serialResult); + TDeserializer deserializer = new TDeserializer( + new TCustomProtocolFactory(this.maxMsgSizeOfResultReceiver)); + try { + deserializer.deserialize(resultBatch, serialResult); + } catch (TException e) { + if (e.getMessage().contains("MaxMessageSize reached")) { + throw new TException("MaxMessageSize reached, try increase max_msg_size_of_result_receiver"); + } else { + throw e; + } + } rowBatch.setBatch(resultBatch); rowBatch.setEos(true); return rowBatch; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java index a9e9740963f..c473d74b919 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java @@ -23,6 +23,7 @@ import org.apache.doris.proto.InternalService; import org.apache.doris.proto.Types; import org.apache.doris.rpc.BackendServiceProxy; import org.apache.doris.rpc.RpcException; +import org.apache.doris.rpc.TCustomProtocolFactory; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TResultBatch; import org.apache.doris.thrift.TStatusCode; @@ -53,12 +54,16 @@ public class ResultReceiver { private Future<InternalService.PFetchDataResult> fetchDataAsyncFuture = null; public String cancelReason = ""; - public ResultReceiver(TUniqueId queryId, TUniqueId tid, Long backendId, TNetworkAddress address, long timeoutTs) { + int maxMsgSizeOfResultReceiver; + + public ResultReceiver(TUniqueId queryId, TUniqueId tid, Long backendId, TNetworkAddress address, long timeoutTs, + int maxMsgSizeOfResultReceiver) { this.queryId = Types.PUniqueId.newBuilder().setHi(queryId.hi).setLo(queryId.lo).build(); this.finstId = Types.PUniqueId.newBuilder().setHi(tid.hi).setLo(tid.lo).build(); this.backendId = backendId; this.address = address; this.timeoutTs = timeoutTs; + this.maxMsgSizeOfResultReceiver = maxMsgSizeOfResultReceiver; } public RowBatch getNext(Status status) throws TException { @@ -136,8 +141,19 @@ public class ResultReceiver { } else if (pResult.hasRowBatch() && pResult.getRowBatch().size() > 0) { byte[] serialResult = pResult.getRowBatch().toByteArray(); TResultBatch resultBatch = new TResultBatch(); - TDeserializer deserializer = new TDeserializer(); - deserializer.deserialize(resultBatch, serialResult); + TDeserializer deserializer = new TDeserializer( + new TCustomProtocolFactory(this.maxMsgSizeOfResultReceiver)); + try { + deserializer.deserialize(resultBatch, serialResult); + } catch (TException e) { + if (e.getMessage().contains("MaxMessageSize reached")) { + throw new TException( + "MaxMessageSize reached, try increase max_msg_size_of_result_receiver"); + } else { + throw e; + } + } + rowBatch.setBatch(resultBatch); rowBatch.setEos(pResult.getEos()); return rowBatch; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 1e137ad1192..164e06e0856 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -45,6 +45,7 @@ import com.google.common.collect.Lists; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.thrift.TConfiguration; import org.json.simple.JSONObject; import org.json.simple.JSONValue; @@ -515,6 +516,8 @@ public class SessionVariable implements Serializable, Writable { public static final String SHOW_ALL_FE_CONNECTION = "show_all_fe_connection"; + public static final String MAX_MSG_SIZE_OF_RESULT_RECEIVER = "max_msg_size_of_result_receiver"; + public static final List<String> DEBUG_VARIABLES = ImmutableList.of( SKIP_DELETE_PREDICATE, SKIP_DELETE_BITMAP, @@ -1636,6 +1639,13 @@ public class SessionVariable implements Serializable, Writable { "当变量为true时,show processlist命令展示所有fe的连接"}) public boolean showAllFeConnection = false; + @VariableMgr.VarAttr(name = MAX_MSG_SIZE_OF_RESULT_RECEIVER, + description = {"Max message size during result deserialization, change this if you meet error" + + " like \"MaxMessageSize reached\"", + "用于控制结果反序列化时 thrift 字段的最大值,当遇到类似\"MaxMessageSize reached\"这样的错误时可以考虑修改该参数"}) + public int maxMsgSizeOfResultReceiver = TConfiguration.DEFAULT_MAX_MESSAGE_SIZE; + + // CLOUD_VARIABLES_BEGIN @VariableMgr.VarAttr(name = CLOUD_CLUSTER) public String cloudCluster = ""; @@ -3450,4 +3460,8 @@ public class SessionVariable implements Serializable, Writable { return this.showAllFeConnection; } + public int getMaxMsgSizeOfResultReceiver() { + return this.maxMsgSizeOfResultReceiver; + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 71af58481d3..808ec0de09d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -1569,7 +1569,8 @@ public class StmtExecutor { RowBatch batch; CoordInterface coordBase = null; if (queryStmt instanceof SelectStmt && ((SelectStmt) parsedStmt).isPointQueryShortCircuit()) { - coordBase = new PointQueryExec(planner, analyzer); + coordBase = new PointQueryExec(planner, analyzer, + context.getSessionVariable().getMaxMsgSizeOfResultReceiver()); } else { coord = new Coordinator(context, analyzer, planner, context.getStatsErrorEstimator()); QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/TCustomProtocolFactory.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/TCustomProtocolFactory.java new file mode 100644 index 00000000000..ca586df1e39 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/TCustomProtocolFactory.java @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.rpc; + +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.protocol.TProtocolFactory; +import org.apache.thrift.transport.TTransport; + +/* +* CustomProtocolFactory can change maxMessageSize of TTransport, +* so that we can transfer message whose filed size is very large. +*/ +public class TCustomProtocolFactory implements TProtocolFactory { + private final int maxMessageSize; + + @Override + public TProtocol getProtocol(TTransport tTransport) { + tTransport.getConfiguration().setMaxMessageSize(maxMessageSize); + return new TBinaryProtocol(tTransport); + } + + public TCustomProtocolFactory(int maxMessageSize) { + this.maxMessageSize = maxMessageSize; + } +} diff --git a/regression-test/suites/variable_p0/max_msg_size_of_result_receiver.groovy b/regression-test/suites/variable_p0/max_msg_size_of_result_receiver.groovy new file mode 100644 index 00000000000..e7fead33d90 --- /dev/null +++ b/regression-test/suites/variable_p0/max_msg_size_of_result_receiver.groovy @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("max_msg_size_of_result_receiver") { + def MESSAGE_SIZE_BASE=1000*1000; // 1MB + def DEFAULT_MAX_MESSAGE_SIZE = 104000000; // 104MB + def table_name = "max_msg_size_of_result_receiver" + sql """ + DROP TABLE IF EXISTS ${table_name} + """ + sql """ + CREATE TABLE IF NOT EXISTS ${table_name} (id int, str string) + ENGINE=OLAP DISTRIBUTED BY HASH(id) + PROPERTIES("replication_num"="1") + """ + + sql """ + INSERT INTO ${table_name} VALUES (104, repeat("a", ${MESSAGE_SIZE_BASE * 104})) + """ + + sql """ + INSERT INTO ${table_name} VALUES (105, repeat("a", ${MESSAGE_SIZE_BASE * 105})) + """ + + def with_exception = false + try { + sql "SELECT * FROM ${table_name} WHERE id = 104" + } catch (Exception e) { + with_exception = true + } + assertEquals(with_exception, false) + + try { + sql "SELECT * FROM ${table_name} WHERE id = 105" + } catch (Exception e) { + assertTrue(e.getMessage().contains('MaxMessageSize reached, try increase max_msg_size_of_result_receiver')) + } + + try { + sql "SELECT /*+SET_VAR(max_msg_size_of_result_receiver=${DEFAULT_MAX_MESSAGE_SIZE * 2})*/ * FROM ${table_name} WHERE id = 105" + } catch (Exception e) { + with_exception = true + + } + assertEquals(with_exception, false) + +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org