This is an automated email from the ASF dual-hosted git repository.

liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d6f6933027 [fix](thrift) fix RPC result is chaotic when compile by 
ASAN (#49516)
9d6f6933027 is described below

commit 9d6f693302717020a87e3bdc938ed2f93d3cc63c
Author: hui lai <lai...@selectdb.com>
AuthorDate: Fri Mar 28 20:03:43 2025 +0800

    [fix](thrift) fix RPC result is chaotic when compile by ASAN (#49516)
    
    **There are two questions when compile by AddressSanitizer:**
    1. The RPC timeout did not meet expectations, as the three minutes
    timeout actually exceeded in one minute.
    2. The commit result is incorrect, transaction 21646315238602752. The
    commit result is 21646508921529344. The origin stream load result:
    ```
    {
    "TxnId": 21646315238602752,
    "Label": "188e8f85-1891-43bc-be39-6737da2e86d8",
    "Comment": "",
    "TwoPhaseCommit": "false",
    "Status": "Fail",
    "Message": "[ANALYSIS_ERROR]TStatus: errCode = 2, detailMessage = internal 
error, commitTxn failed, transactionId:21646508921529344, 
code:TXN_ALREADY_ABORTED, msg:transaction
    [21646508921529344] is already aborted, db_id=1739017681561",
    "NumberTotalRows": 5000,
    "NumberLoadedRows": 5000,
    "NumberFilteredRows": 0,
    "NumberUnselectedRows": 0,
    "LoadBytes": 589899,
    "LoadTimeMs": 305186,
    "BeginTxnTimeMs": 9,
    "StreamLoadPutTimeMs": 9,
    "ReadDataTimeMs": 12,
    "WriteDataTimeMs": 2466,
    "ReceiveDataTimeMs": 26,
    "CommitAndPublishTimeMs": 0
    }
    ```
    
    **Reason:**
    1. RPC timeout does not meet expectations: In ASAN mode, the Thrift
    connection was reused, and the timeout will not be reset, so the RPC
    timeout may be less than expectation timeout.
    2. Chaotic commit result: In ASAN mode, the Thrift connection was
    reused. Since this connection has not been reopened, if the following
    error occurred in the last RPC:
    ```
    errmsg=[THRIFT_RPC_ERROR]failed to call frontend service, FE 
address=172.20.48.73:9020, reason: THRIFT_EAGAIN (timed out)
    ```
    The result obtained by next load after multiplexing connection may be
    the result of the previous one.
    
    Co-authored-by: Xin Liao <liaoxin...@126.com>
---
 be/src/util/thrift_rpc_helper.cpp                  |  8 +-
 .../apache/doris/service/FrontendServiceImpl.java  | 11 +++
 ...test_stream_load_commit_result_incorrect.groovy | 94 ++++++++++++++++++++++
 3 files changed, 107 insertions(+), 6 deletions(-)

diff --git a/be/src/util/thrift_rpc_helper.cpp 
b/be/src/util/thrift_rpc_helper.cpp
index 77e7f377658..9dd88da462b 100644
--- a/be/src/util/thrift_rpc_helper.cpp
+++ b/be/src/util/thrift_rpc_helper.cpp
@@ -30,6 +30,7 @@
 #include "common/status.h"
 #include "runtime/client_cache.h"
 #include "runtime/exec_env.h" // IWYU pragma: keep
+#include "util/debug_points.h"
 #include "util/network_util.h"
 
 namespace apache {
@@ -64,6 +65,7 @@ Status ThriftRpcHelper::rpc(const std::string& ip, const 
int32_t port,
                             std::function<void(ClientConnection<T>&)> 
callback, int timeout_ms) {
     TNetworkAddress address = make_network_address(ip, port);
     Status status;
+    DBUG_EXECUTE_IF("thriftRpcHelper.rpc.error", { timeout_ms = 30000; });
     ClientConnection<T> client(_s_exec_env->get_client_cache<T>(), address, 
timeout_ms, &status);
     if (!status.ok()) {
         LOG(WARNING) << "Connect frontend failed, address=" << address << ", 
status=" << status;
@@ -74,11 +76,6 @@ Status ThriftRpcHelper::rpc(const std::string& ip, const 
int32_t port,
             callback(client);
         } catch (apache::thrift::transport::TTransportException& e) {
             std::cerr << "thrift error, reason=" << e.what();
-#ifdef ADDRESS_SANITIZER
-            return Status::RpcError<false>(
-                    "failed to call frontend service, FE address={}:{}, 
reason: {}", ip, port,
-                    e.what());
-#else
             LOG(WARNING) << "retrying call frontend service after "
                          << config::thrift_client_retry_interval_ms << " ms, 
address=" << address
                          << ", reason=" << e.what();
@@ -91,7 +88,6 @@ Status ThriftRpcHelper::rpc(const std::string& ip, const 
int32_t port,
                 return status;
             }
             callback(client);
-#endif
         }
     } catch (apache::thrift::TException& e) {
         LOG(WARNING) << "call frontend service failed, address=" << address
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 9c9b2eedc75..fda250a7ae9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -1597,6 +1597,17 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             return result;
         }
 
+        if (DebugPointUtil.isEnable("load.commit_timeout")) {
+            try {
+                Thread.sleep(60 * 1000);
+            } catch (InterruptedException e) {
+                LOG.warn("failed to sleep", e);
+            }
+            status.setStatusCode(TStatusCode.INTERNAL_ERROR);
+            status.addToErrorMsgs("load commit timeout");
+            return result;
+        }
+
         try {
             if (!loadTxnCommitImpl(request)) {
                 // committed success but not visible
diff --git 
a/regression-test/suites/load_p0/stream_load/test_stream_load_commit_result_incorrect.groovy
 
b/regression-test/suites/load_p0/stream_load/test_stream_load_commit_result_incorrect.groovy
new file mode 100644
index 00000000000..7be644ea1fc
--- /dev/null
+++ 
b/regression-test/suites/load_p0/stream_load/test_stream_load_commit_result_incorrect.groovy
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_stream_load_commit_result_incorrect", "nonConcurrent") {
+    def tableName = "test_stream_load_commit_result_incorrect"
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+    sql """
+        CREATE TABLE IF NOT EXISTS ${tableName} (
+            `k1` bigint(20) NULL,
+            `k2` bigint(20) NULL,
+            `v1` tinyint(4) SUM NULL,
+            `v2` tinyint(4) REPLACE NULL,
+            `v3` tinyint(4) REPLACE_IF_NOT_NULL NULL,
+            `v4` smallint(6) REPLACE_IF_NOT_NULL NULL,
+            `v5` int(11) REPLACE_IF_NOT_NULL NULL,
+            `v6` bigint(20) REPLACE_IF_NOT_NULL NULL,
+            `v7` largeint(40) REPLACE_IF_NOT_NULL NULL,
+            `v8` datetime REPLACE_IF_NOT_NULL NULL,
+            `v9` date REPLACE_IF_NOT_NULL NULL,
+            `v10` char(10) REPLACE_IF_NOT_NULL NULL,
+            `v11` varchar(6) REPLACE_IF_NOT_NULL NULL,
+            `v12` decimal(27, 9) REPLACE_IF_NOT_NULL NULL
+        ) ENGINE=OLAP
+        AGGREGATE KEY(`k1`, `k2`)
+        COMMENT 'OLAP'
+        PARTITION BY RANGE(`k1`)
+        (PARTITION partition_a VALUES [("-9223372036854775808"), ("100000")),
+        PARTITION partition_b VALUES [("100000"), ("1000000000")),
+        PARTITION partition_c VALUES [("1000000000"), ("10000000000")),
+        PARTITION partition_d VALUES [("10000000000"), (MAXVALUE)))
+        DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 3
+        PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+    """
+
+    GetDebugPoint().enableDebugPointForAllFEs("load.commit_timeout");
+    GetDebugPoint().enableDebugPointForAllBEs("thriftRpcHelper.rpc.error");
+    try {
+        streamLoad {
+            table "${tableName}"
+            set 'column_separator', '\t'
+            set 'columns', 'k1, k2, v2, v10, v11'
+            set 'partitions', 'partition_a, partition_b, partition_c, 
partition_d'
+            set 'strict_mode', 'true'
+
+            file 'test_strict_mode.csv'
+            time 10000 // limit inflight 10s
+            check { result, exception, startTime, endTime ->
+                if (exception != null) {
+                    throw exception
+                }
+                log.info("Stream load result: ${result}".toString())
+            }
+        }
+    } finally {
+        GetDebugPoint().disableDebugPointForAllFEs("load.commit_timeout");
+        
GetDebugPoint().disableDebugPointForAllBEs("thriftRpcHelper.rpc.error");
+    }
+
+    sleep(1000 * 30)
+
+    streamLoad {
+        table "${tableName}"
+        set 'column_separator', '\t'
+        set 'columns', 'k1, k2, v2, v10, v11'
+        set 'partitions', 'partition_a, partition_b, partition_c, partition_d'
+        set 'strict_mode', 'true'
+
+        file 'test_strict_mode.csv'
+        time 10000 // limit inflight 10s
+        check { result, exception, startTime, endTime ->
+            if (exception != null) {
+                throw exception
+            }
+            log.info("Stream load result: ${result}".toString())
+            def json = parseJson(result)
+            assertEquals("success", json.Status.toLowerCase())
+            assertEquals("finished", json.ExistingJobStatus.toLowerCase())
+        }
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to