This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 3759b58543d [fix](cloud-mow)Fix not release delete bitmap lock when 
checking txn state is visible #47580 (#47652)
3759b58543d is described below

commit 3759b58543d28aa4b46b094994457ecc4150d679
Author: huanghaibin <huanghai...@selectdb.com>
AuthorDate: Sat Feb 8 21:06:19 2025 +0800

    [fix](cloud-mow)Fix not release delete bitmap lock when checking txn state 
is visible #47580 (#47652)
    
    ### What problem does this PR solve?
    pick #47580
---
 .../transaction/CloudGlobalTransactionMgr.java     | 59 +++++++++++++++-------
 ...t_cloud_mow_stream_load_with_commit_fail.groovy |  3 ++
 .../test_cloud_mow_stream_load_with_timeout.groovy | 20 ++++++++
 3 files changed, 65 insertions(+), 17 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
index 849a05f2c15..2b564e385c5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
@@ -354,6 +354,9 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
                     
tableList.stream().map(Table::getId).collect(Collectors.toList()));
             Map<Long, List<TCalcDeleteBitmapPartitionInfo>> 
backendToPartitionInfos = null;
             if (!mowTableList.isEmpty()) {
+                if (!checkTransactionStateBeforeCommit(dbId, transactionId)) {
+                    return;
+                }
                 DeleteBitmapUpdateLockContext lockContext = new 
DeleteBitmapUpdateLockContext();
                 getDeleteBitmapUpdateLock(transactionId, mowTableList, 
tabletCommitInfos, lockContext);
                 if (lockContext.getBackendToPartitionTablets().isEmpty()) {
@@ -374,6 +377,40 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
         }
     }
 
+    private boolean checkTransactionStateBeforeCommit(long dbId, long 
transactionId)
+            throws TransactionCommitFailedException {
+        // if this txn has been calculated by previously task but commit rpc 
is timeout,
+        // be will send another commit request to fe, so if txn is committed 
or visible,
+        // no need to calculate delete bitmap again, just return ok to be to 
finish this commit.
+        TransactionState transactionState = 
Env.getCurrentGlobalTransactionMgr()
+                .getTransactionState(dbId, transactionId);
+        if (transactionState == null) {
+            throw new TransactionCommitFailedException("txn does not exist: " 
+ transactionId);
+        }
+        if (null != transactionState.getTransactionStatus()) {
+            if (transactionState.getTransactionStatus() == 
TransactionStatus.ABORTED) {
+                throw new TransactionCommitFailedException("transaction [" + 
transactionId
+                        + "] is already aborted. abort reason: " + 
transactionState.getReason());
+            } else if (transactionState.getTransactionStatus() == 
TransactionStatus.COMMITTED
+                    || transactionState.getTransactionStatus() == 
TransactionStatus.VISIBLE) {
+                LOG.info("txn={}, status={} not need to calculate delete 
bitmap again, just return ",
+                        transactionId,
+                        transactionState.getTransactionStatus().toString());
+                return false;
+            } else if (transactionState.getTransactionStatus() == 
TransactionStatus.PREPARE) {
+                LOG.info("txn={}, status={} need to calculate delete bitmap", 
transactionId,
+                        transactionState.getTransactionStatus().toString());
+                return true;
+            } else {
+                throw new TransactionCommitFailedException("transaction [" + 
transactionId
+                        + "] status is: " + 
transactionState.getTransactionStatus().toString());
+            }
+        } else {
+            throw new TransactionCommitFailedException("transaction [" + 
transactionId
+                    + "] status is null ");
+        }
+    }
+
     /**
      * Post process of commitTxn
      * 1. update some stats
@@ -499,23 +536,6 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
         }
 
         if (!mowTableList.isEmpty()) {
-            // may be this txn has been calculated by previously task but 
commit rpc is timeout,
-            // and be will send another commit request to fe, so need to check 
txn status first
-            // before sending delete bitmap task to be, if txn is committed or 
visible, no need to
-            // calculate delete bitmap again, just return ok to be to finish 
this commit.
-            TransactionState transactionState = 
Env.getCurrentGlobalTransactionMgr()
-                    .getTransactionState(dbId, transactionId);
-            if (null != transactionState && null != 
transactionState.getTransactionStatus()) {
-                if (transactionState.getTransactionStatus() == 
TransactionStatus.COMMITTED
-                        || transactionState.getTransactionStatus() == 
TransactionStatus.VISIBLE) {
-                    LOG.info("txn={}, status={} not need to calculate delete 
bitmap again, just return ", transactionId,
-                            
transactionState.getTransactionStatus().toString());
-                    return;
-                } else {
-                    LOG.info("txn={}, status={} need to calculate delete 
bitmap", transactionId,
-                            
transactionState.getTransactionStatus().toString());
-                }
-            }
             sendCalcDeleteBitmaptask(dbId, transactionId, 
backendToPartitionInfos);
         }
 
@@ -1162,6 +1182,11 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
     public void commitTransaction2PC(Database db, List<Table> tableList, long 
transactionId, long timeoutMillis)
             throws UserException {
         List<OlapTable> mowTableList = getMowTableList(tableList, null);
+        if (!mowTableList.isEmpty()) {
+            if (!checkTransactionStateBeforeCommit(db.getId(), transactionId)) 
{
+                return;
+            }
+        }
         commitTransactionWithoutLock(db.getId(), tableList, transactionId, 
null, null, true, mowTableList, null);
     }
 
diff --git 
a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.groovy
 
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.groovy
index fa71c3644f2..0ab20324d72 100644
--- 
a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.groovy
+++ 
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.groovy
@@ -16,6 +16,9 @@
 // under the License.
 
 suite("test_cloud_mow_stream_load_with_commit_fail", "nonConcurrent") {
+    if (!isCloudMode()) {
+        return
+    }
     GetDebugPoint().clearDebugPointsForAllFEs()
 
     def backendId_to_backendIP = [:]
diff --git 
a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_timeout.groovy
 
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_timeout.groovy
index 7176aec702f..8ffc04dd735 100644
--- 
a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_timeout.groovy
+++ 
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_timeout.groovy
@@ -180,6 +180,26 @@ suite("test_cloud_mow_stream_load_with_timeout", 
"nonConcurrent") {
             }
         }
         qt_sql """ select * from ${tableName} order by id"""
+        def now = System.currentTimeMillis()
+        
GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.commitTransaction.timeout")
+        streamLoad {
+            table "${tableName}"
+
+            set 'column_separator', ','
+            set 'columns', 'id, name, score'
+            file "test_stream_load.csv"
+
+            time 10000 // limit inflight 10s
+
+            check { result, exception, startTime, endTime ->
+                log.info("Stream load result: ${result}")
+                def json = parseJson(result)
+                assertEquals("success", json.Status.toLowerCase())
+
+            }
+        }
+        def time_diff = System.currentTimeMillis() - now
+        assertTrue(time_diff < 10000)
     } finally {
         
GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.commitTransaction.timeout")
         sql "DROP TABLE IF EXISTS ${tableName};"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to