This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 47968c43ae3 branch-3.0: [Fix](cloud-mow) avoid calc delete bitmap 
tasks on same (txn_id, tablet_id) being executed concurrently (#50847) (#50964)
47968c43ae3 is described below

commit 47968c43ae3492cf16b2aabadb34216e8482c34e
Author: bobhan1 <bao...@selectdb.com>
AuthorDate: Fri May 16 20:31:00 2025 +0800

    branch-3.0: [Fix](cloud-mow) avoid calc delete bitmap tasks on same 
(txn_id, tablet_id) being executed concurrently (#50847) (#50964)
    
    pick https://github.com/apache/doris/pull/50847
---
 .../cloud/cloud_engine_calc_delete_bitmap_task.cpp |   5 +
 be/src/cloud/cloud_full_compaction.cpp             |   1 -
 be/src/cloud/cloud_tablet.h                        |   3 +
 be/src/olap/base_tablet.cpp                        |   2 +
 .../transaction/CloudGlobalTransactionMgr.java     |   3 +
 .../cloud/test_cloud_concurrent_calc_dbm_task.out  | Bin 0 -> 267 bytes
 .../test_cloud_concurrent_calc_dbm_task.groovy     | 165 +++++++++++++++++++++
 7 files changed, 178 insertions(+), 1 deletion(-)

diff --git a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp 
b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
index 910ea9a531b..dc8ecaf26b4 100644
--- a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
+++ b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
@@ -152,6 +152,11 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
         return Status::Error<ErrorCode::PUSH_TABLE_NOT_EXIST>(
                 "can't get tablet when calculate delete bitmap. tablet_id={}", 
_tablet_id);
     }
+    // After https://github.com/apache/doris/pull/50417, there may be multiple 
calc delete bitmap tasks
+    // with different signatures on the same (txn_id, tablet_id) load in same 
BE. We use _rowset_update_lock
+    // to avoid them being executed concurrently to avoid correctness problem.
+    std::unique_lock wrlock(tablet->get_rowset_update_lock());
+
     int64_t max_version = tablet->max_version_unlocked();
     int64_t t2 = MonotonicMicros();
 
diff --git a/be/src/cloud/cloud_full_compaction.cpp 
b/be/src/cloud/cloud_full_compaction.cpp
index 0b6810a414c..1d86210225f 100644
--- a/be/src/cloud/cloud_full_compaction.cpp
+++ b/be/src/cloud/cloud_full_compaction.cpp
@@ -333,7 +333,6 @@ Status 
CloudFullCompaction::_cloud_full_compaction_update_delete_bitmap(int64_t
     RETURN_IF_ERROR(
             _engine.meta_mgr().get_delete_bitmap_update_lock(*cloud_tablet(), 
-1, initiator));
     RETURN_IF_ERROR(_engine.meta_mgr().sync_tablet_rowsets(cloud_tablet()));
-    std::lock_guard 
rowset_update_lock(cloud_tablet()->get_rowset_update_lock());
     std::lock_guard header_lock(_tablet->get_header_lock());
     for (const auto& it : cloud_tablet()->rowset_map()) {
         int64_t cur_version = it.first.first;
diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h
index bfe6c0a2d69..7958ee4d71a 100644
--- a/be/src/cloud/cloud_tablet.h
+++ b/be/src/cloud/cloud_tablet.h
@@ -277,6 +277,9 @@ private:
 
     std::mutex _base_compaction_lock;
     std::mutex _cumulative_compaction_lock;
+
+    // To avoid multiple calc delete bitmap tasks on same (txn_id, tablet_id) 
with different
+    // signatures being executed concurrently, we use _rowset_update_lock to 
serialize them
     mutable std::mutex _rowset_update_lock;
 
     // Schema will be merged from all rowsets when sync_rowsets
diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp
index 41e8bbf16d8..b6ee6ff3fd0 100644
--- a/be/src/olap/base_tablet.cpp
+++ b/be/src/olap/base_tablet.cpp
@@ -1206,6 +1206,8 @@ Status BaseTablet::update_delete_bitmap(const 
BaseTabletSPtr& self, TabletTxnInf
     if (is_partial_update) {
         transient_rs_writer = DORIS_TRY(self->create_transient_rowset_writer(
                 *rowset, txn_info->partial_update_info, txn_expiration));
+        
DBUG_EXECUTE_IF("BaseTablet::update_delete_bitmap.after.create_transient_rs_writer",
+                        DBUG_BLOCK);
         // Partial update might generate new segments when there is conflicts 
while publish, and mark
         // the same key in original segments as delete.
         // When the new segment flush fails or the rowset build fails, the 
deletion marker for the
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
index c91bcbf7475..2bbb45ccd7b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
@@ -1046,6 +1046,9 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
                 signature = UUID.randomUUID().getLeastSignificantBits();
             }
         }
+        if (DebugPointUtil.isEnable("sendCalcDbmtask.change_signature")) {
+            signature = UUID.randomUUID().getLeastSignificantBits();
+        }
         setTxnLastSignature(dbId, transactionId, signature);
         for (long tableId : mowTableIds) {
             setTableLastTxnId(dbId, tableId, transactionId);
diff --git 
a/regression-test/data/fault_injection_p0/cloud/test_cloud_concurrent_calc_dbm_task.out
 
b/regression-test/data/fault_injection_p0/cloud/test_cloud_concurrent_calc_dbm_task.out
new file mode 100644
index 00000000000..55372a75d9c
Binary files /dev/null and 
b/regression-test/data/fault_injection_p0/cloud/test_cloud_concurrent_calc_dbm_task.out
 differ
diff --git 
a/regression-test/suites/fault_injection_p0/cloud/test_cloud_concurrent_calc_dbm_task.groovy
 
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_concurrent_calc_dbm_task.groovy
new file mode 100644
index 00000000000..852a5fcde5b
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_concurrent_calc_dbm_task.groovy
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.apache.doris.regression.util.NodeType
+
+suite("test_cloud_concurrent_calc_dbm_task", "multi_cluster,docker") {
+    def options = new ClusterOptions()
+    options.cloudMode = true
+    options.setFeNum(1)
+    options.setBeNum(1)
+    options.feConfigs += [
+        'cloud_cluster_check_interval_second=1',
+        'calculate_delete_bitmap_task_timeout_seconds=10',
+        'mow_calculate_delete_bitmap_retry_times=10',
+        'enable_workload_group=false',
+    ]
+    options.beConfigs += [
+        'enable_debug_points=true'
+    ]
+
+    docker(options) {
+        try {
+            GetDebugPoint().clearDebugPointsForAllFEs()
+            GetDebugPoint().clearDebugPointsForAllBEs()
+
+            def table1 = "test_cloud_concurrent_calc_dbm_task"
+            sql "DROP TABLE IF EXISTS ${table1} FORCE;"
+            sql """ CREATE TABLE IF NOT EXISTS ${table1} (
+                    `k1` int NOT NULL,
+                    `c1` int,
+                    `c2` int
+                    )UNIQUE KEY(k1)
+                DISTRIBUTED BY HASH(k1) BUCKETS 1
+                PROPERTIES (
+                    "enable_unique_key_merge_on_write" = "true",
+                    "disable_auto_compaction" = "true",
+                    "replication_num" = "1"); """
+
+            // add cluster1
+            cluster.addBackend(1, "cluster1")
+            def ret = sql_return_maparray """show clusters"""
+            logger.info("clusters: " + ret)
+            def cluster0 = ret.stream().filter(cluster -> cluster.is_current 
== "TRUE").findFirst().orElse(null)
+            def cluster1 = ret.stream().filter(cluster -> cluster.cluster == 
"cluster1").findFirst().orElse(null)
+            assert cluster0 != null
+            assert cluster1 != null
+            logger.info("cluster0: " + cluster0)
+            logger.info("cluster1: " + cluster1)
+
+            // get tablet in cluster0
+            def tablets = sql_return_maparray """ show tablets from ${table1}; 
"""
+            logger.info("tablets in cluster 0: " + tablets)
+            assert 1 == tablets.size()
+            def tablet0 = tablets[0]
+            def tablet_id = tablet0.TabletId
+
+            // get tablet in cluster1
+            sql """use @${cluster1.cluster}"""
+            def tablets1 = sql_return_maparray """ show tablets from 
${table1}; """
+            logger.info("tablets in cluster 1: " + tablets1)
+            assert 1 == tablets1.size()
+            def tablet1 = tablets1[0]
+
+            def backends = sql_return_maparray """show backends;"""
+            def backend0 = backends.stream().filter(be -> be.BackendId == 
tablet0.BackendId).findFirst().orElse(null)
+            assert backend0 != null
+            logger.info("backend0: " + backend0)
+            def backend1 = backends.stream().filter(be -> be.BackendId == 
tablet1.BackendId).findFirst().orElse(null)
+            assert backend1 != null
+            logger.info("backend1: " + backend1)
+
+            // insert data in cluster0
+            sql """use @${cluster0.cluster}"""
+            sql """ INSERT INTO ${table1} VALUES (1,1,1),(2,2,2),(3,3,3); """
+            sql "sync;"
+            // read data from cluster0
+            qt_base_cluster0 "select * from ${table1} order by k1;"
+
+            // read data from cluster1
+            sql """use @${cluster1.cluster}"""
+            qt_base_cluster1 "select * from ${table1} order by k1;"
+
+            def newThreadInDocker = { Closure actionSupplier ->
+                def connInfo = context.threadLocalConn.get()
+                return Thread.start {
+                    connect(connInfo.username, connInfo.password, 
connInfo.conn.getMetaData().getURL(), actionSupplier)
+                }
+            }
+
+            
GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait")
+            
GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block")
+
+            // block all calc task on be 1 after create transient rs writer
+            GetDebugPoint().enableDebugPoint(backend1.Host, backend1.HttpPort 
as int, NodeType.BE, 
"BaseTablet::update_delete_bitmap.after.create_transient_rs_writer")
+
+
+            // partial update load 1 on cluster0
+            def t1 = newThreadInDocker {
+                sql """use @${cluster0.cluster}"""
+                sql "set enable_unique_key_partial_update=true;"
+                sql "insert into ${table1}(k1,c1) values(1,999);"
+            }
+
+            Thread.sleep(1000)
+
+            // partial update load 2 on cluster1
+            def t2 = newThreadInDocker {
+                sql """use @${cluster1.cluster}"""
+                sql "set enable_unique_key_partial_update=true;"
+                sql "insert into ${table1}(k1,c2) values(1,888);"
+            }
+            Thread.sleep(1500)
+
+            // let partial update load 1 succeed and wait for it
+            
GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait")
+            
GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block")
+            t1.join()
+
+            sql """use @${cluster0.cluster}"""
+            qt_upadte1_cluster0 "select * from ${table1} order by k1;"
+
+            // force FE change signature of calc dbm task
+            
GetDebugPoint().enableDebugPointForAllFEs("sendCalcDbmtask.change_signature")
+
+            // let partial update load 2 generate many calc dbm tasks with 
different signatures
+            Thread.sleep(30000)
+            GetDebugPoint().disableDebugPoint(backend1.Host, backend1.HttpPort 
as int, NodeType.BE, 
"BaseTablet::update_delete_bitmap.after.create_transient_rs_writer")
+
+            // wait for partial update load 2 finish
+            t2.join()
+
+            sql """use @${cluster1.cluster}"""
+            qt_upadte2_cluster1 "select * from ${table1} order by k1;"
+            // Exception e1 = null
+            // try {
+            //     sql "select * from ${table1} order by k1;"
+            // } catch (Exception e) {
+            //     e1 = e
+            // }
+            // assert e1 != null && e1.getMessage().contains("failed to read")
+
+        } catch(Exception e) {
+            logger.info(e.getMessage())
+            throw e
+        } finally {
+            GetDebugPoint().clearDebugPointsForAllBEs()
+            GetDebugPoint().clearDebugPointsForAllFEs()
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to