This is an automated email from the ASF dual-hosted git repository. liaoxin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 9d6f6933027 [fix](thrift) fix RPC result is chaotic when compile by ASAN (#49516) 9d6f6933027 is described below commit 9d6f693302717020a87e3bdc938ed2f93d3cc63c Author: hui lai <lai...@selectdb.com> AuthorDate: Fri Mar 28 20:03:43 2025 +0800 [fix](thrift) fix RPC result is chaotic when compile by ASAN (#49516) **There are two questions when compile by AddressSanitizer:** 1. The RPC timeout did not meet expectations, as the three minutes timeout actually exceeded in one minute. 2. The commit result is incorrect, transaction 21646315238602752. The commit result is 21646508921529344. The origin stream load result: ``` { "TxnId": 21646315238602752, "Label": "188e8f85-1891-43bc-be39-6737da2e86d8", "Comment": "", "TwoPhaseCommit": "false", "Status": "Fail", "Message": "[ANALYSIS_ERROR]TStatus: errCode = 2, detailMessage = internal error, commitTxn failed, transactionId:21646508921529344, code:TXN_ALREADY_ABORTED, msg:transaction [21646508921529344] is already aborted, db_id=1739017681561", "NumberTotalRows": 5000, "NumberLoadedRows": 5000, "NumberFilteredRows": 0, "NumberUnselectedRows": 0, "LoadBytes": 589899, "LoadTimeMs": 305186, "BeginTxnTimeMs": 9, "StreamLoadPutTimeMs": 9, "ReadDataTimeMs": 12, "WriteDataTimeMs": 2466, "ReceiveDataTimeMs": 26, "CommitAndPublishTimeMs": 0 } ``` **Reason:** 1. RPC timeout does not meet expectations: In ASAN mode, the Thrift connection was reused, and the timeout will not be reset, so the RPC timeout may be less than expectation timeout. 2. Chaotic commit result: In ASAN mode, the Thrift connection was reused. Since this connection has not been reopened, if the following error occurred in the last RPC: ``` errmsg=[THRIFT_RPC_ERROR]failed to call frontend service, FE address=172.20.48.73:9020, reason: THRIFT_EAGAIN (timed out) ``` The result obtained by next load after multiplexing connection may be the result of the previous one. Co-authored-by: Xin Liao <liaoxin...@126.com> --- be/src/util/thrift_rpc_helper.cpp | 8 +- .../apache/doris/service/FrontendServiceImpl.java | 11 +++ ...test_stream_load_commit_result_incorrect.groovy | 94 ++++++++++++++++++++++ 3 files changed, 107 insertions(+), 6 deletions(-) diff --git a/be/src/util/thrift_rpc_helper.cpp b/be/src/util/thrift_rpc_helper.cpp index 77e7f377658..9dd88da462b 100644 --- a/be/src/util/thrift_rpc_helper.cpp +++ b/be/src/util/thrift_rpc_helper.cpp @@ -30,6 +30,7 @@ #include "common/status.h" #include "runtime/client_cache.h" #include "runtime/exec_env.h" // IWYU pragma: keep +#include "util/debug_points.h" #include "util/network_util.h" namespace apache { @@ -64,6 +65,7 @@ Status ThriftRpcHelper::rpc(const std::string& ip, const int32_t port, std::function<void(ClientConnection<T>&)> callback, int timeout_ms) { TNetworkAddress address = make_network_address(ip, port); Status status; + DBUG_EXECUTE_IF("thriftRpcHelper.rpc.error", { timeout_ms = 30000; }); ClientConnection<T> client(_s_exec_env->get_client_cache<T>(), address, timeout_ms, &status); if (!status.ok()) { LOG(WARNING) << "Connect frontend failed, address=" << address << ", status=" << status; @@ -74,11 +76,6 @@ Status ThriftRpcHelper::rpc(const std::string& ip, const int32_t port, callback(client); } catch (apache::thrift::transport::TTransportException& e) { std::cerr << "thrift error, reason=" << e.what(); -#ifdef ADDRESS_SANITIZER - return Status::RpcError<false>( - "failed to call frontend service, FE address={}:{}, reason: {}", ip, port, - e.what()); -#else LOG(WARNING) << "retrying call frontend service after " << config::thrift_client_retry_interval_ms << " ms, address=" << address << ", reason=" << e.what(); @@ -91,7 +88,6 @@ Status ThriftRpcHelper::rpc(const std::string& ip, const int32_t port, return status; } callback(client); -#endif } } catch (apache::thrift::TException& e) { LOG(WARNING) << "call frontend service failed, address=" << address diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 9c9b2eedc75..fda250a7ae9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -1597,6 +1597,17 @@ public class FrontendServiceImpl implements FrontendService.Iface { return result; } + if (DebugPointUtil.isEnable("load.commit_timeout")) { + try { + Thread.sleep(60 * 1000); + } catch (InterruptedException e) { + LOG.warn("failed to sleep", e); + } + status.setStatusCode(TStatusCode.INTERNAL_ERROR); + status.addToErrorMsgs("load commit timeout"); + return result; + } + try { if (!loadTxnCommitImpl(request)) { // committed success but not visible diff --git a/regression-test/suites/load_p0/stream_load/test_stream_load_commit_result_incorrect.groovy b/regression-test/suites/load_p0/stream_load/test_stream_load_commit_result_incorrect.groovy new file mode 100644 index 00000000000..7be644ea1fc --- /dev/null +++ b/regression-test/suites/load_p0/stream_load/test_stream_load_commit_result_incorrect.groovy @@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_stream_load_commit_result_incorrect", "nonConcurrent") { + def tableName = "test_stream_load_commit_result_incorrect" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k1` bigint(20) NULL, + `k2` bigint(20) NULL, + `v1` tinyint(4) SUM NULL, + `v2` tinyint(4) REPLACE NULL, + `v3` tinyint(4) REPLACE_IF_NOT_NULL NULL, + `v4` smallint(6) REPLACE_IF_NOT_NULL NULL, + `v5` int(11) REPLACE_IF_NOT_NULL NULL, + `v6` bigint(20) REPLACE_IF_NOT_NULL NULL, + `v7` largeint(40) REPLACE_IF_NOT_NULL NULL, + `v8` datetime REPLACE_IF_NOT_NULL NULL, + `v9` date REPLACE_IF_NOT_NULL NULL, + `v10` char(10) REPLACE_IF_NOT_NULL NULL, + `v11` varchar(6) REPLACE_IF_NOT_NULL NULL, + `v12` decimal(27, 9) REPLACE_IF_NOT_NULL NULL + ) ENGINE=OLAP + AGGREGATE KEY(`k1`, `k2`) + COMMENT 'OLAP' + PARTITION BY RANGE(`k1`) + (PARTITION partition_a VALUES [("-9223372036854775808"), ("100000")), + PARTITION partition_b VALUES [("100000"), ("1000000000")), + PARTITION partition_c VALUES [("1000000000"), ("10000000000")), + PARTITION partition_d VALUES [("10000000000"), (MAXVALUE))) + DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 3 + PROPERTIES ("replication_allocation" = "tag.location.default: 1"); + """ + + GetDebugPoint().enableDebugPointForAllFEs("load.commit_timeout"); + GetDebugPoint().enableDebugPointForAllBEs("thriftRpcHelper.rpc.error"); + try { + streamLoad { + table "${tableName}" + set 'column_separator', '\t' + set 'columns', 'k1, k2, v2, v10, v11' + set 'partitions', 'partition_a, partition_b, partition_c, partition_d' + set 'strict_mode', 'true' + + file 'test_strict_mode.csv' + time 10000 // limit inflight 10s + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + } + } + } finally { + GetDebugPoint().disableDebugPointForAllFEs("load.commit_timeout"); + GetDebugPoint().disableDebugPointForAllBEs("thriftRpcHelper.rpc.error"); + } + + sleep(1000 * 30) + + streamLoad { + table "${tableName}" + set 'column_separator', '\t' + set 'columns', 'k1, k2, v2, v10, v11' + set 'partitions', 'partition_a, partition_b, partition_c, partition_d' + set 'strict_mode', 'true' + + file 'test_strict_mode.csv' + time 10000 // limit inflight 10s + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals("finished", json.ExistingJobStatus.toLowerCase()) + } + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org