This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 47968c43ae3 branch-3.0: [Fix](cloud-mow) avoid calc delete bitmap tasks on same (txn_id, tablet_id) being executed concurrently (#50847) (#50964) 47968c43ae3 is described below commit 47968c43ae3492cf16b2aabadb34216e8482c34e Author: bobhan1 <bao...@selectdb.com> AuthorDate: Fri May 16 20:31:00 2025 +0800 branch-3.0: [Fix](cloud-mow) avoid calc delete bitmap tasks on same (txn_id, tablet_id) being executed concurrently (#50847) (#50964) pick https://github.com/apache/doris/pull/50847 --- .../cloud/cloud_engine_calc_delete_bitmap_task.cpp | 5 + be/src/cloud/cloud_full_compaction.cpp | 1 - be/src/cloud/cloud_tablet.h | 3 + be/src/olap/base_tablet.cpp | 2 + .../transaction/CloudGlobalTransactionMgr.java | 3 + .../cloud/test_cloud_concurrent_calc_dbm_task.out | Bin 0 -> 267 bytes .../test_cloud_concurrent_calc_dbm_task.groovy | 165 +++++++++++++++++++++ 7 files changed, 178 insertions(+), 1 deletion(-) diff --git a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp index 910ea9a531b..dc8ecaf26b4 100644 --- a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp +++ b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp @@ -152,6 +152,11 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const { return Status::Error<ErrorCode::PUSH_TABLE_NOT_EXIST>( "can't get tablet when calculate delete bitmap. tablet_id={}", _tablet_id); } + // After https://github.com/apache/doris/pull/50417, there may be multiple calc delete bitmap tasks + // with different signatures on the same (txn_id, tablet_id) load in same BE. We use _rowset_update_lock + // to avoid them being executed concurrently to avoid correctness problem. + std::unique_lock wrlock(tablet->get_rowset_update_lock()); + int64_t max_version = tablet->max_version_unlocked(); int64_t t2 = MonotonicMicros(); diff --git a/be/src/cloud/cloud_full_compaction.cpp b/be/src/cloud/cloud_full_compaction.cpp index 0b6810a414c..1d86210225f 100644 --- a/be/src/cloud/cloud_full_compaction.cpp +++ b/be/src/cloud/cloud_full_compaction.cpp @@ -333,7 +333,6 @@ Status CloudFullCompaction::_cloud_full_compaction_update_delete_bitmap(int64_t RETURN_IF_ERROR( _engine.meta_mgr().get_delete_bitmap_update_lock(*cloud_tablet(), -1, initiator)); RETURN_IF_ERROR(_engine.meta_mgr().sync_tablet_rowsets(cloud_tablet())); - std::lock_guard rowset_update_lock(cloud_tablet()->get_rowset_update_lock()); std::lock_guard header_lock(_tablet->get_header_lock()); for (const auto& it : cloud_tablet()->rowset_map()) { int64_t cur_version = it.first.first; diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h index bfe6c0a2d69..7958ee4d71a 100644 --- a/be/src/cloud/cloud_tablet.h +++ b/be/src/cloud/cloud_tablet.h @@ -277,6 +277,9 @@ private: std::mutex _base_compaction_lock; std::mutex _cumulative_compaction_lock; + + // To avoid multiple calc delete bitmap tasks on same (txn_id, tablet_id) with different + // signatures being executed concurrently, we use _rowset_update_lock to serialize them mutable std::mutex _rowset_update_lock; // Schema will be merged from all rowsets when sync_rowsets diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp index 41e8bbf16d8..b6ee6ff3fd0 100644 --- a/be/src/olap/base_tablet.cpp +++ b/be/src/olap/base_tablet.cpp @@ -1206,6 +1206,8 @@ Status BaseTablet::update_delete_bitmap(const BaseTabletSPtr& self, TabletTxnInf if (is_partial_update) { transient_rs_writer = DORIS_TRY(self->create_transient_rowset_writer( *rowset, txn_info->partial_update_info, txn_expiration)); + DBUG_EXECUTE_IF("BaseTablet::update_delete_bitmap.after.create_transient_rs_writer", + DBUG_BLOCK); // Partial update might generate new segments when there is conflicts while publish, and mark // the same key in original segments as delete. // When the new segment flush fails or the rowset build fails, the deletion marker for the diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java index c91bcbf7475..2bbb45ccd7b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java @@ -1046,6 +1046,9 @@ public class CloudGlobalTransactionMgr implements GlobalTransactionMgrIface { signature = UUID.randomUUID().getLeastSignificantBits(); } } + if (DebugPointUtil.isEnable("sendCalcDbmtask.change_signature")) { + signature = UUID.randomUUID().getLeastSignificantBits(); + } setTxnLastSignature(dbId, transactionId, signature); for (long tableId : mowTableIds) { setTableLastTxnId(dbId, tableId, transactionId); diff --git a/regression-test/data/fault_injection_p0/cloud/test_cloud_concurrent_calc_dbm_task.out b/regression-test/data/fault_injection_p0/cloud/test_cloud_concurrent_calc_dbm_task.out new file mode 100644 index 00000000000..55372a75d9c Binary files /dev/null and b/regression-test/data/fault_injection_p0/cloud/test_cloud_concurrent_calc_dbm_task.out differ diff --git a/regression-test/suites/fault_injection_p0/cloud/test_cloud_concurrent_calc_dbm_task.groovy b/regression-test/suites/fault_injection_p0/cloud/test_cloud_concurrent_calc_dbm_task.groovy new file mode 100644 index 00000000000..852a5fcde5b --- /dev/null +++ b/regression-test/suites/fault_injection_p0/cloud/test_cloud_concurrent_calc_dbm_task.groovy @@ -0,0 +1,165 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import org.apache.doris.regression.util.NodeType + +suite("test_cloud_concurrent_calc_dbm_task", "multi_cluster,docker") { + def options = new ClusterOptions() + options.cloudMode = true + options.setFeNum(1) + options.setBeNum(1) + options.feConfigs += [ + 'cloud_cluster_check_interval_second=1', + 'calculate_delete_bitmap_task_timeout_seconds=10', + 'mow_calculate_delete_bitmap_retry_times=10', + 'enable_workload_group=false', + ] + options.beConfigs += [ + 'enable_debug_points=true' + ] + + docker(options) { + try { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + + def table1 = "test_cloud_concurrent_calc_dbm_task" + sql "DROP TABLE IF EXISTS ${table1} FORCE;" + sql """ CREATE TABLE IF NOT EXISTS ${table1} ( + `k1` int NOT NULL, + `c1` int, + `c2` int + )UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "disable_auto_compaction" = "true", + "replication_num" = "1"); """ + + // add cluster1 + cluster.addBackend(1, "cluster1") + def ret = sql_return_maparray """show clusters""" + logger.info("clusters: " + ret) + def cluster0 = ret.stream().filter(cluster -> cluster.is_current == "TRUE").findFirst().orElse(null) + def cluster1 = ret.stream().filter(cluster -> cluster.cluster == "cluster1").findFirst().orElse(null) + assert cluster0 != null + assert cluster1 != null + logger.info("cluster0: " + cluster0) + logger.info("cluster1: " + cluster1) + + // get tablet in cluster0 + def tablets = sql_return_maparray """ show tablets from ${table1}; """ + logger.info("tablets in cluster 0: " + tablets) + assert 1 == tablets.size() + def tablet0 = tablets[0] + def tablet_id = tablet0.TabletId + + // get tablet in cluster1 + sql """use @${cluster1.cluster}""" + def tablets1 = sql_return_maparray """ show tablets from ${table1}; """ + logger.info("tablets in cluster 1: " + tablets1) + assert 1 == tablets1.size() + def tablet1 = tablets1[0] + + def backends = sql_return_maparray """show backends;""" + def backend0 = backends.stream().filter(be -> be.BackendId == tablet0.BackendId).findFirst().orElse(null) + assert backend0 != null + logger.info("backend0: " + backend0) + def backend1 = backends.stream().filter(be -> be.BackendId == tablet1.BackendId).findFirst().orElse(null) + assert backend1 != null + logger.info("backend1: " + backend1) + + // insert data in cluster0 + sql """use @${cluster0.cluster}""" + sql """ INSERT INTO ${table1} VALUES (1,1,1),(2,2,2),(3,3,3); """ + sql "sync;" + // read data from cluster0 + qt_base_cluster0 "select * from ${table1} order by k1;" + + // read data from cluster1 + sql """use @${cluster1.cluster}""" + qt_base_cluster1 "select * from ${table1} order by k1;" + + def newThreadInDocker = { Closure actionSupplier -> + def connInfo = context.threadLocalConn.get() + return Thread.start { + connect(connInfo.username, connInfo.password, connInfo.conn.getMetaData().getURL(), actionSupplier) + } + } + + GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait") + GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block") + + // block all calc task on be 1 after create transient rs writer + GetDebugPoint().enableDebugPoint(backend1.Host, backend1.HttpPort as int, NodeType.BE, "BaseTablet::update_delete_bitmap.after.create_transient_rs_writer") + + + // partial update load 1 on cluster0 + def t1 = newThreadInDocker { + sql """use @${cluster0.cluster}""" + sql "set enable_unique_key_partial_update=true;" + sql "insert into ${table1}(k1,c1) values(1,999);" + } + + Thread.sleep(1000) + + // partial update load 2 on cluster1 + def t2 = newThreadInDocker { + sql """use @${cluster1.cluster}""" + sql "set enable_unique_key_partial_update=true;" + sql "insert into ${table1}(k1,c2) values(1,888);" + } + Thread.sleep(1500) + + // let partial update load 1 succeed and wait for it + GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait") + GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block") + t1.join() + + sql """use @${cluster0.cluster}""" + qt_upadte1_cluster0 "select * from ${table1} order by k1;" + + // force FE change signature of calc dbm task + GetDebugPoint().enableDebugPointForAllFEs("sendCalcDbmtask.change_signature") + + // let partial update load 2 generate many calc dbm tasks with different signatures + Thread.sleep(30000) + GetDebugPoint().disableDebugPoint(backend1.Host, backend1.HttpPort as int, NodeType.BE, "BaseTablet::update_delete_bitmap.after.create_transient_rs_writer") + + // wait for partial update load 2 finish + t2.join() + + sql """use @${cluster1.cluster}""" + qt_upadte2_cluster1 "select * from ${table1} order by k1;" + // Exception e1 = null + // try { + // sql "select * from ${table1} order by k1;" + // } catch (Exception e) { + // e1 = e + // } + // assert e1 != null && e1.getMessage().contains("failed to read") + + } catch(Exception e) { + logger.info(e.getMessage()) + throw e + } finally { + GetDebugPoint().clearDebugPointsForAllBEs() + GetDebugPoint().clearDebugPointsForAllFEs() + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org