This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 69b8e2b999c branch-3.0: [fix](cloud-mow) Fix sending commiting rpc to 
FE twice problem (#43854)
69b8e2b999c is described below

commit 69b8e2b999ced2f05264bdc033fd9463320961b0
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Nov 13 18:59:25 2024 +0800

    branch-3.0: [fix](cloud-mow) Fix sending commiting rpc to FE twice problem 
(#43854)
    
    Cherry-picked from #41395
    
    Co-authored-by: huanghaibin <huanghai...@selectdb.com>
---
 .../transaction/CloudGlobalTransactionMgr.java     | 21 ++++++++++
 .../test_cloud_mow_stream_load_with_timeout.out    |  4 ++
 .../test_cloud_mow_stream_load_with_timeout.groovy | 49 ++++++++++++++++++++--
 3 files changed, 71 insertions(+), 3 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
index 9a7ee5bc86e..b2d994433d7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
@@ -475,6 +475,23 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
 
         List<OlapTable> mowTableList = getMowTableList(tableList, 
tabletCommitInfos);
         if (!mowTableList.isEmpty()) {
+            // may be this txn has been calculated by previously task but 
commit rpc is timeout,
+            // and be will send another commit request to fe, so need to check 
txn status first
+            // before sending delete bitmap task to be, if txn is committed or 
visible, no need to
+            // calculate delete bitmap again, just return ok to be to finish 
this commit.
+            TransactionState transactionState = 
Env.getCurrentGlobalTransactionMgr()
+                    .getTransactionState(dbId, transactionId);
+            if (null != transactionState && null != 
transactionState.getTransactionStatus()) {
+                if (transactionState.getTransactionStatus() == 
TransactionStatus.COMMITTED
+                        || transactionState.getTransactionStatus() == 
TransactionStatus.VISIBLE) {
+                    LOG.info("txn={}, status={} not need to calculate delete 
bitmap again, just return ", transactionId,
+                            
transactionState.getTransactionStatus().toString());
+                    return;
+                } else {
+                    LOG.info("txn={}, status={} need to calculate delete 
bitmap", transactionId,
+                            
transactionState.getTransactionStatus().toString());
+                }
+            }
             calcDeleteBitmapForMow(dbId, mowTableList, transactionId, 
tabletCommitInfos);
         }
 
@@ -519,6 +536,10 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
         try {
             txnState = commitTxn(commitTxnRequest, transactionId, is2PC, dbId, 
tableList);
             txnOperated = true;
+            if 
(DebugPointUtil.isEnable("CloudGlobalTransactionMgr.commitTransaction.timeout"))
 {
+                throw new 
UserException(InternalErrorCode.DELETE_BITMAP_LOCK_ERR,
+                        "test delete bitmap update lock timeout, 
transactionId:" + transactionId);
+            }
         } finally {
             if (txnCommitAttachment != null && txnCommitAttachment instanceof 
RLTaskTxnCommitAttachment) {
                 RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = 
(RLTaskTxnCommitAttachment) txnCommitAttachment;
diff --git 
a/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_timeout.out
 
b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_timeout.out
index b8b3ea3ecca..72273f89557 100644
--- 
a/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_timeout.out
+++ 
b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_timeout.out
@@ -5,3 +5,7 @@
 5      e       90
 6      f       100
 
+-- !sql --
+5      e       90
+6      f       100
+
diff --git 
a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_timeout.groovy
 
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_timeout.groovy
index 122503b1611..7176aec702f 100644
--- 
a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_timeout.groovy
+++ 
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_timeout.groovy
@@ -74,13 +74,13 @@ suite("test_cloud_mow_stream_load_with_timeout", 
"nonConcurrent") {
 
     // store the original value
     get_be_param("mow_stream_load_commit_retry_times")
-    // disable retry to make this problem more clear
-    set_be_param("mow_stream_load_commit_retry_times", "1")
-
 
     def tableName = "tbl_basic"
+    // test fe release lock when calculating delete bitmap timeout
     setFeConfigTemporary(customFeConfig) {
         try {
+            // disable retry to make this problem more clear
+            set_be_param("mow_stream_load_commit_retry_times", "1")
             // create table
             sql """ drop table if exists ${tableName}; """
 
@@ -143,4 +143,47 @@ suite("test_cloud_mow_stream_load_with_timeout", 
"nonConcurrent") {
         }
 
     }
+
+    //test fe don't send calculating delete bitmap task to be twice when txn 
is committed or visible
+    GetDebugPoint().clearDebugPointsForAllFEs()
+    
GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.commitTransaction.timeout")
+    try {
+        // create table
+        sql """ drop table if exists ${tableName}; """
+
+        sql """
+        CREATE TABLE `${tableName}` (
+            `id` int(11) NOT NULL,
+            `name` varchar(1100) NULL,
+            `score` int(11) NULL default "-1"
+        ) ENGINE=OLAP
+        UNIQUE KEY(`id`)
+        DISTRIBUTED BY HASH(`id`) BUCKETS 1
+        PROPERTIES (
+            "enable_unique_key_merge_on_write" = "true",
+            "replication_num" = "1"
+        );
+        """
+        streamLoad {
+            table "${tableName}"
+
+            set 'column_separator', ','
+            set 'columns', 'id, name, score'
+            file "test_stream_load.csv"
+
+            time 10000 // limit inflight 10s
+
+            check { result, exception, startTime, endTime ->
+                log.info("Stream load result: ${result}")
+                def json = parseJson(result)
+                assertEquals("success", json.Status.toLowerCase())
+            }
+        }
+        qt_sql """ select * from ${tableName} order by id"""
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.commitTransaction.timeout")
+        sql "DROP TABLE IF EXISTS ${tableName};"
+        GetDebugPoint().clearDebugPointsForAllFEs()
+    }
+
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to