This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 4e9f14071e0 [fix](path gc) Fix path gc race with publish task (#50343) 4e9f14071e0 is described below commit 4e9f14071e006dc2c086e3f8f19017fc721e520b Author: deardeng <deng...@selectdb.com> AuthorDate: Mon Apr 28 11:04:33 2025 +0800 [fix](path gc) Fix path gc race with publish task (#50343) ### What problem does this PR solve? Fix path gc race with publish task and checkpoint task, if not fix it, 1. very low probability of data loss 2. run `test_path_gc_with_publish_version` case will get err ``` java.sql.SQLException: errCode = 2, detailMessage = (175.40.51.1)[NOT_FOUND]failed to get file size /opt/apache-doris/be/storage/1.HDD/data/22/1745498446653/1868719407/0200000000000019214e72da3f1fe5cfc12 c5efad2bf05bf_0.dat: No such file or directory ``` --- be/src/olap/data_dir.cpp | 8 ++ be/src/olap/olap_server.cpp | 13 +++- be/src/olap/tablet.cpp | 1 - be/src/olap/task/engine_publish_version_task.cpp | 87 +++++++++++++--------- be/src/olap/txn_manager.cpp | 10 ++- be/src/olap/txn_manager.h | 6 +- be/test/olap/delta_writer_cluster_key_test.cpp | 8 +- be/test/olap/delta_writer_test.cpp | 23 ++++-- .../olap/engine_storage_migration_task_test.cpp | 7 +- be/test/olap/segment_cache_test.cpp | 8 +- be/test/olap/tablet_cooldown_test.cpp | 7 +- be/test/olap/txn_manager_test.cpp | 14 +++- be/test/runtime/snapshot_loader_test.cpp | 4 +- .../test_path_gc_with_publish_version.groovy | 77 +++++++++++++++++++ 14 files changed, 204 insertions(+), 69 deletions(-) diff --git a/be/src/olap/data_dir.cpp b/be/src/olap/data_dir.cpp index 323288678d5..c8c6285987a 100644 --- a/be/src/olap/data_dir.cpp +++ b/be/src/olap/data_dir.cpp @@ -754,6 +754,14 @@ void DataDir::_perform_rowset_gc(const std::string& tablet_schema_hash_path) { [&rowsets_in_version_map](auto& rs) { rowsets_in_version_map.insert(rs->rowset_id()); }, true); + DBUG_EXECUTE_IF("DataDir::_perform_rowset_gc.simulation.slow", { + auto target_tablet_id = dp->param<int64_t>("tablet_id", -1); + if (target_tablet_id == tablet_id) { + LOG(INFO) << "debug point wait tablet to remove rsmgr tabletId=" << tablet_id; + DBUG_BLOCK; + } + }); + auto reclaim_rowset_file = [](const std::string& path) { auto st = io::global_local_filesystem()->delete_file(path); if (!st.ok()) [[unlikely]] { diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index 8f73c66e44e..bd0367421c8 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -465,9 +465,17 @@ void StorageEngine::_path_gc_thread_callback(DataDir* data_dir) { int32_t current_time = time(nullptr); int32_t interval = _auto_get_interval_by_disk_capacity(data_dir); + DBUG_EXECUTE_IF("_path_gc_thread_callback.interval.eq.1ms", { + LOG(INFO) << "debug point change interval eq 1ms"; + interval = 1; + while (DebugPoints::instance()->is_enable("_path_gc_thread_callback.always.do")) { + data_dir->perform_path_gc(); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + }); if (interval <= 0) { LOG(WARNING) << "path gc thread check interval config is illegal:" << interval - << "will be forced set to half hour"; + << " will be forced set to half hour"; interval = 1800; // 0.5 hour } if (current_time - last_exec_time >= interval) { @@ -483,8 +491,9 @@ void StorageEngine::_path_gc_thread_callback(DataDir* data_dir) { void StorageEngine::_tablet_checkpoint_callback(const std::vector<DataDir*>& data_dirs) { int64_t interval = config::generate_tablet_meta_checkpoint_tasks_interval_secs; do { - LOG(INFO) << "begin to produce tablet meta checkpoint tasks."; for (auto data_dir : data_dirs) { + LOG(INFO) << "begin to produce tablet meta checkpoint tasks, data_dir=" + << data_dir->path(); auto st = _tablet_meta_checkpoint_thread_pool->submit_func( [data_dir, this]() { _tablet_manager->do_tablet_meta_checkpoint(data_dir); }); if (!st.ok()) { diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index f8650e4f2b1..8b735eacd15 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -1499,7 +1499,6 @@ bool Tablet::do_tablet_meta_checkpoint() { _newly_created_rowset_num < config::tablet_meta_checkpoint_min_new_rowsets_num) { return false; } - // hold read-lock other than write-lock, because it will not modify meta structure std::shared_lock rdlock(_meta_lock); if (tablet_state() != TABLET_RUNNING) { diff --git a/be/src/olap/task/engine_publish_version_task.cpp b/be/src/olap/task/engine_publish_version_task.cpp index 49784cd9e52..2dcc1723b71 100644 --- a/be/src/olap/task/engine_publish_version_task.cpp +++ b/be/src/olap/task/engine_publish_version_task.cpp @@ -396,6 +396,49 @@ TabletPublishTxnTask::TabletPublishTxnTask(StorageEngine& engine, TabletPublishTxnTask::~TabletPublishTxnTask() = default; +Status publish_version_and_add_rowset(StorageEngine& engine, int64_t partition_id, + const TabletSharedPtr& tablet, const RowsetSharedPtr& rowset, + int64_t transaction_id, const Version& version, + EnginePublishVersionTask* engine_publish_version_task, + TabletPublishStatistics& stats) { + // ATTN: Here, the life cycle needs to be extended to prevent tablet_txn_info.pending_rs_guard in txn + // from being released prematurely, causing path gc to mistakenly delete the dat file + std::shared_ptr<TabletTxnInfo> extend_tablet_txn_info_lifetime = nullptr; + + // Publish the transaction + auto result = engine.txn_manager()->publish_txn(partition_id, tablet, transaction_id, version, + &stats, extend_tablet_txn_info_lifetime); + if (!result.ok()) { + LOG(WARNING) << "failed to publish version. rowset_id=" << rowset->rowset_id() + << ", tablet_id=" << tablet->tablet_id() << ", txn_id=" << transaction_id + << ", res=" << result; + if (engine_publish_version_task) { + engine_publish_version_task->add_error_tablet_id(tablet->tablet_id()); + } + return result; + } + + DBUG_EXECUTE_IF("EnginePublishVersionTask.handle.block_add_rowsets", DBUG_BLOCK); + + // Add visible rowset to tablet + int64_t start_time = MonotonicMicros(); + result = tablet->add_inc_rowset(rowset); + DBUG_EXECUTE_IF("EnginePublishVersionTask.handle.after_add_inc_rowset_rowsets_block", + DBUG_BLOCK); + stats.add_inc_rowset_us = MonotonicMicros() - start_time; + if (!result.ok() && !result.is<PUSH_VERSION_ALREADY_EXIST>()) { + LOG(WARNING) << "fail to add visible rowset to tablet. rowset_id=" << rowset->rowset_id() + << ", tablet_id=" << tablet->tablet_id() << ", txn_id=" << transaction_id + << ", res=" << result; + if (engine_publish_version_task) { + engine_publish_version_task->add_error_tablet_id(tablet->tablet_id()); + } + return result; + } + + return result; +} + void TabletPublishTxnTask::handle() { std::shared_lock migration_rlock(_tablet->get_migration_lock(), std::chrono::seconds(5)); SCOPED_ATTACH_TASK(_mem_tracker); @@ -411,29 +454,14 @@ void TabletPublishTxnTask::handle() { rowset_update_lock.lock(); } _stats.schedule_time_us = MonotonicMicros() - _stats.submit_time_us; - _result = _engine.txn_manager()->publish_txn(_partition_id, _tablet, _transaction_id, _version, - &_stats); + _result = publish_version_and_add_rowset(_engine, _partition_id, _tablet, _rowset, + _transaction_id, _version, + _engine_publish_version_task, _stats); + if (!_result.ok()) { - LOG(WARNING) << "failed to publish version. rowset_id=" << _rowset->rowset_id() - << ", tablet_id=" << _tablet_info.tablet_id << ", txn_id=" << _transaction_id - << ", res=" << _result; - _engine_publish_version_task->add_error_tablet_id(_tablet_info.tablet_id); return; } - DBUG_EXECUTE_IF("EnginePublishVersionTask.handle.block_add_rowsets", DBUG_BLOCK); - - // add visible rowset to tablet - int64_t t1 = MonotonicMicros(); - _result = _tablet->add_inc_rowset(_rowset); - _stats.add_inc_rowset_us = MonotonicMicros() - t1; - if (!_result.ok() && !_result.is<PUSH_VERSION_ALREADY_EXIST>()) { - LOG(WARNING) << "fail to add visible rowset to tablet. rowset_id=" << _rowset->rowset_id() - << ", tablet_id=" << _tablet_info.tablet_id << ", txn_id=" << _transaction_id - << ", res=" << _result; - _engine_publish_version_task->add_error_tablet_id(_tablet_info.tablet_id); - return; - } int64_t cost_us = MonotonicMicros() - _stats.submit_time_us; g_tablet_publish_latency << cost_us; _stats.record_in_bvar(); @@ -466,27 +494,14 @@ void AsyncTabletPublishTask::handle() { } RowsetSharedPtr rowset = iter->second; Version version(_version, _version); - auto publish_status = _engine.txn_manager()->publish_txn(_partition_id, _tablet, - _transaction_id, version, &_stats); - if (!publish_status.ok()) { - LOG(WARNING) << "failed to publish version. rowset_id=" << rowset->rowset_id() - << ", tablet_id=" << _tablet->tablet_id() << ", txn_id=" << _transaction_id - << ", res=" << publish_status; - return; - } - DBUG_EXECUTE_IF("EnginePublishVersionTask.handle.block_add_rowsets", DBUG_BLOCK); + auto publish_status = publish_version_and_add_rowset(_engine, _partition_id, _tablet, rowset, + _transaction_id, version, nullptr, _stats); - // add visible rowset to tablet - int64_t t1 = MonotonicMicros(); - publish_status = _tablet->add_inc_rowset(rowset); - _stats.add_inc_rowset_us = MonotonicMicros() - t1; - if (!publish_status.ok() && !publish_status.is<PUSH_VERSION_ALREADY_EXIST>()) { - LOG(WARNING) << "fail to add visible rowset to tablet. rowset_id=" << rowset->rowset_id() - << ", tablet_id=" << _tablet->tablet_id() << ", txn_id=" << _transaction_id - << ", res=" << publish_status; + if (!publish_status.ok()) { return; } + int64_t cost_us = MonotonicMicros() - _stats.submit_time_us; // print stats if publish cost > 500ms g_tablet_publish_latency << cost_us; diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp index 6197bf1b6b4..fedf3ecde82 100644 --- a/be/src/olap/txn_manager.cpp +++ b/be/src/olap/txn_manager.cpp @@ -200,9 +200,11 @@ Status TxnManager::commit_txn(TPartitionId partition_id, const Tablet& tablet, Status TxnManager::publish_txn(TPartitionId partition_id, const TabletSharedPtr& tablet, TTransactionId transaction_id, const Version& version, - TabletPublishStatistics* stats) { + TabletPublishStatistics* stats, + std::shared_ptr<TabletTxnInfo>& extend_tablet_txn_info) { return publish_txn(tablet->data_dir()->get_meta(), partition_id, transaction_id, - tablet->tablet_id(), tablet->tablet_uid(), version, stats); + tablet->tablet_id(), tablet->tablet_uid(), version, stats, + extend_tablet_txn_info); } void TxnManager::abort_txn(TPartitionId partition_id, TTransactionId transaction_id, @@ -457,7 +459,8 @@ Status TxnManager::commit_txn(OlapMeta* meta, TPartitionId partition_id, Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id, TTransactionId transaction_id, TTabletId tablet_id, TabletUid tablet_uid, const Version& version, - TabletPublishStatistics* stats) { + TabletPublishStatistics* stats, + std::shared_ptr<TabletTxnInfo>& extend_tablet_txn_info) { auto tablet = _engine.tablet_manager()->get_tablet(tablet_id); if (tablet == nullptr) { return Status::OK(); @@ -483,6 +486,7 @@ Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id, // found load for txn,tablet // case 1: user commit rowset, then the load id must be equal tablet_txn_info = txn_info_iter->second; + extend_tablet_txn_info = tablet_txn_info; rowset = tablet_txn_info->rowset; } } diff --git a/be/src/olap/txn_manager.h b/be/src/olap/txn_manager.h index 95853c09705..7de2d39db1a 100644 --- a/be/src/olap/txn_manager.h +++ b/be/src/olap/txn_manager.h @@ -172,7 +172,8 @@ public: Status publish_txn(TPartitionId partition_id, const TabletSharedPtr& tablet, TTransactionId transaction_id, const Version& version, - TabletPublishStatistics* stats); + TabletPublishStatistics* stats, + std::shared_ptr<TabletTxnInfo>& extend_tablet_txn_info); // delete the txn from manager if it is not committed(not have a valid rowset) Status rollback_txn(TPartitionId partition_id, const Tablet& tablet, @@ -190,7 +191,8 @@ public: // not persist rowset meta because Status publish_txn(OlapMeta* meta, TPartitionId partition_id, TTransactionId transaction_id, TTabletId tablet_id, TabletUid tablet_uid, const Version& version, - TabletPublishStatistics* stats); + TabletPublishStatistics* stats, + std::shared_ptr<TabletTxnInfo>& extend_tablet_txn_info); // only abort not committed txn void abort_txn(TPartitionId partition_id, TTransactionId transaction_id, TTabletId tablet_id, diff --git a/be/test/olap/delta_writer_cluster_key_test.cpp b/be/test/olap/delta_writer_cluster_key_test.cpp index 6c4b4d367a3..fdbf6bbc770 100644 --- a/be/test/olap/delta_writer_cluster_key_test.cpp +++ b/be/test/olap/delta_writer_cluster_key_test.cpp @@ -309,9 +309,11 @@ TEST_F(TestDeltaWriterClusterKey, vec_sequence_col) { std::cout << "start to publish txn" << std::endl; RowsetSharedPtr rowset = tablet_related_rs.begin()->second; TabletPublishStatistics pstats; - res = engine_ref->txn_manager()->publish_txn( - meta, write_req.partition_id, write_req.txn_id, write_req.tablet_id, - tablet_related_rs.begin()->first.tablet_uid, version, &pstats); + std::shared_ptr<TabletTxnInfo> extend_tablet_txn_info_lifetime = nullptr; + res = engine_ref->txn_manager()->publish_txn(meta, write_req.partition_id, write_req.txn_id, + write_req.tablet_id, + tablet_related_rs.begin()->first.tablet_uid, + version, &pstats, extend_tablet_txn_info_lifetime); ASSERT_TRUE(res.ok()); std::cout << "start to add inc rowset:" << rowset->rowset_id() << ", num rows:" << rowset->num_rows() << ", version:" << rowset->version().first diff --git a/be/test/olap/delta_writer_test.cpp b/be/test/olap/delta_writer_test.cpp index e870e3ad6f2..7f6aadd6070 100644 --- a/be/test/olap/delta_writer_test.cpp +++ b/be/test/olap/delta_writer_test.cpp @@ -669,9 +669,10 @@ TEST_F(TestDeltaWriter, vec_write) { std::cout << "start to publish txn" << std::endl; RowsetSharedPtr rowset = tablet_rs.second; TabletPublishStatistics stats; - res = engine_ref->txn_manager()->publish_txn(meta, write_req.partition_id, write_req.txn_id, - write_req.tablet_id, - tablet_rs.first.tablet_uid, version, &stats); + std::shared_ptr<TabletTxnInfo> extend_tablet_txn_info_lifetime = nullptr; + res = engine_ref->txn_manager()->publish_txn( + meta, write_req.partition_id, write_req.txn_id, write_req.tablet_id, + tablet_rs.first.tablet_uid, version, &stats, extend_tablet_txn_info_lifetime); ASSERT_TRUE(res.ok()); std::cout << "start to add inc rowset:" << rowset->rowset_id() << ", num rows:" << rowset->num_rows() << ", version:" << rowset->version().first @@ -763,9 +764,11 @@ TEST_F(TestDeltaWriter, vec_sequence_col) { std::cout << "start to publish txn" << std::endl; RowsetSharedPtr rowset = tablet_related_rs.begin()->second; TabletPublishStatistics pstats; - res = engine_ref->txn_manager()->publish_txn( - meta, write_req.partition_id, write_req.txn_id, write_req.tablet_id, - tablet_related_rs.begin()->first.tablet_uid, version, &pstats); + std::shared_ptr<TabletTxnInfo> extend_tablet_txn_info_lifetime = nullptr; + res = engine_ref->txn_manager()->publish_txn(meta, write_req.partition_id, write_req.txn_id, + write_req.tablet_id, + tablet_related_rs.begin()->first.tablet_uid, + version, &pstats, extend_tablet_txn_info_lifetime); ASSERT_TRUE(res.ok()); std::cout << "start to add inc rowset:" << rowset->rowset_id() << ", num rows:" << rowset->num_rows() << ", version:" << rowset->version().first @@ -911,9 +914,11 @@ TEST_F(TestDeltaWriter, vec_sequence_col_concurrent_write) { std::cout << "start to publish txn" << std::endl; rowset1 = tablet_related_rs.begin()->second; TabletPublishStatistics pstats; + std::shared_ptr<TabletTxnInfo> extend_tablet_txn_info_lifetime = nullptr; res = engine_ref->txn_manager()->publish_txn( meta, write_req.partition_id, write_req.txn_id, write_req.tablet_id, - tablet_related_rs.begin()->first.tablet_uid, version, &pstats); + tablet_related_rs.begin()->first.tablet_uid, version, &pstats, + extend_tablet_txn_info_lifetime); ASSERT_TRUE(res.ok()); std::cout << "start to add inc rowset:" << rowset1->rowset_id() << ", num rows:" << rowset1->num_rows() @@ -964,9 +969,11 @@ TEST_F(TestDeltaWriter, vec_sequence_col_concurrent_write) { ASSERT_TRUE(delete_bitmap->contains({rowset2->rowset_id(), 0, 0}, 1)); TabletPublishStatistics pstats; + std::shared_ptr<TabletTxnInfo> extend_tablet_txn_info_lifetime = nullptr; res = engine_ref->txn_manager()->publish_txn( meta, write_req.partition_id, write_req.txn_id, write_req.tablet_id, - tablet_related_rs.begin()->first.tablet_uid, version, &pstats); + tablet_related_rs.begin()->first.tablet_uid, version, &pstats, + extend_tablet_txn_info_lifetime); ASSERT_TRUE(res.ok()); std::cout << "start to add inc rowset:" << rowset2->rowset_id() << ", num rows:" << rowset2->num_rows() diff --git a/be/test/olap/engine_storage_migration_task_test.cpp b/be/test/olap/engine_storage_migration_task_test.cpp index 6d87413e5d3..20d58fc3e30 100644 --- a/be/test/olap/engine_storage_migration_task_test.cpp +++ b/be/test/olap/engine_storage_migration_task_test.cpp @@ -226,9 +226,10 @@ TEST_F(TestEngineStorageMigrationTask, write_and_migration) { for (auto& tablet_rs : tablet_related_rs) { RowsetSharedPtr rowset = tablet_rs.second; TabletPublishStatistics stats; - res = engine_ref->txn_manager()->publish_txn(meta, write_req.partition_id, write_req.txn_id, - tablet->tablet_id(), tablet->tablet_uid(), - version, &stats); + std::shared_ptr<TabletTxnInfo> extend_tablet_txn_info_lifetime = nullptr; + res = engine_ref->txn_manager()->publish_txn( + meta, write_req.partition_id, write_req.txn_id, tablet->tablet_id(), + tablet->tablet_uid(), version, &stats, extend_tablet_txn_info_lifetime); EXPECT_EQ(Status::OK(), res); res = tablet->add_inc_rowset(rowset); EXPECT_EQ(Status::OK(), res); diff --git a/be/test/olap/segment_cache_test.cpp b/be/test/olap/segment_cache_test.cpp index c527ffddd42..3002f0f7829 100644 --- a/be/test/olap/segment_cache_test.cpp +++ b/be/test/olap/segment_cache_test.cpp @@ -293,9 +293,11 @@ TEST_F(SegmentCacheTest, vec_sequence_col) { std::cout << "start to publish txn" << std::endl; RowsetSharedPtr rowset = tablet_related_rs.begin()->second; TabletPublishStatistics pstats; - res = engine_ref->txn_manager()->publish_txn( - meta, write_req.partition_id, write_req.txn_id, write_req.tablet_id, - tablet_related_rs.begin()->first.tablet_uid, version, &pstats); + std::shared_ptr<TabletTxnInfo> extend_tablet_txn_info_lifetime = nullptr; + res = engine_ref->txn_manager()->publish_txn(meta, write_req.partition_id, write_req.txn_id, + write_req.tablet_id, + tablet_related_rs.begin()->first.tablet_uid, + version, &pstats, extend_tablet_txn_info_lifetime); ASSERT_TRUE(res.ok()); std::cout << "start to add inc rowset:" << rowset->rowset_id() << ", num rows:" << rowset->num_rows() << ", version:" << rowset->version().first diff --git a/be/test/olap/tablet_cooldown_test.cpp b/be/test/olap/tablet_cooldown_test.cpp index 8826cec7e9c..fbcbb443131 100644 --- a/be/test/olap/tablet_cooldown_test.cpp +++ b/be/test/olap/tablet_cooldown_test.cpp @@ -376,9 +376,10 @@ static void write_rowset(TabletSharedPtr* tablet, PUniqueId load_id, int64_t rep for (auto& tablet_rs : tablet_related_rs) { RowsetSharedPtr rowset = tablet_rs.second; TabletPublishStatistics stats; - st = engine_ref->txn_manager()->publish_txn(meta, write_req.partition_id, write_req.txn_id, - (*tablet)->tablet_id(), (*tablet)->tablet_uid(), - version, &stats); + std::shared_ptr<TabletTxnInfo> extend_tablet_txn_info_lifetime = nullptr; + st = engine_ref->txn_manager()->publish_txn( + meta, write_req.partition_id, write_req.txn_id, (*tablet)->tablet_id(), + (*tablet)->tablet_uid(), version, &stats, extend_tablet_txn_info_lifetime); ASSERT_EQ(Status::OK(), st); st = (*tablet)->add_inc_rowset(rowset); ASSERT_EQ(Status::OK(), st); diff --git a/be/test/olap/txn_manager_test.cpp b/be/test/olap/txn_manager_test.cpp index c7926b771db..eac5ff46d5a 100644 --- a/be/test/olap/txn_manager_test.cpp +++ b/be/test/olap/txn_manager_test.cpp @@ -326,9 +326,13 @@ TEST_F(TxnManagerTest, PublishVersionSuccessful) { ASSERT_TRUE(st.ok()) << st; Version new_version(10, 11); TabletPublishStatistics stats; - st = k_engine->txn_manager()->publish_txn(_meta.get(), partition_id, transaction_id, tablet_id, - _tablet_uid, new_version, &stats); - ASSERT_TRUE(st.ok()) << st; + { + std::shared_ptr<TabletTxnInfo> extend_tablet_txn_info_lifetime = nullptr; + st = k_engine->txn_manager()->publish_txn(_meta.get(), partition_id, transaction_id, + tablet_id, _tablet_uid, new_version, &stats, + extend_tablet_txn_info_lifetime); + ASSERT_TRUE(st.ok()) << st; + } RowsetMetaSharedPtr rowset_meta(new RowsetMeta()); st = RowsetMetaManager::get_rowset_meta(_meta.get(), _tablet_uid, _rowset->rowset_id(), @@ -345,8 +349,10 @@ TEST_F(TxnManagerTest, PublishNotExistedTxn) { Version new_version(10, 11); auto not_exist_txn = transaction_id + 1000; TabletPublishStatistics stats; + std::shared_ptr<TabletTxnInfo> extend_tablet_txn_info_lifetime = nullptr; auto st = k_engine->txn_manager()->publish_txn(_meta.get(), partition_id, not_exist_txn, - tablet_id, _tablet_uid, new_version, &stats); + tablet_id, _tablet_uid, new_version, &stats, + extend_tablet_txn_info_lifetime); ASSERT_FALSE(st.ok()) << st; } diff --git a/be/test/runtime/snapshot_loader_test.cpp b/be/test/runtime/snapshot_loader_test.cpp index 36af0f5d70d..6aba0e6fe18 100644 --- a/be/test/runtime/snapshot_loader_test.cpp +++ b/be/test/runtime/snapshot_loader_test.cpp @@ -248,7 +248,9 @@ static void add_rowset(int64_t tablet_id, int32_t schema_hash, int64_t partition RowsetSharedPtr rowset = tablet_related_rs.begin()->second; TabletPublishStatistics stats; - res = engine_ref->txn_manager()->publish_txn(partition_id, tablet, txn_id, version, &stats); + std::shared_ptr<TabletTxnInfo> extend_tablet_txn_info_lifetime = nullptr; + res = engine_ref->txn_manager()->publish_txn(partition_id, tablet, txn_id, version, &stats, + extend_tablet_txn_info_lifetime); ASSERT_TRUE(res.ok()) << res; std::cout << "start to add inc rowset:" << rowset->rowset_id() << ", num rows:" << rowset->num_rows() << ", version:" << rowset->version().first diff --git a/regression-test/suites/path_gc_p0/test_path_gc_with_publish_version.groovy b/regression-test/suites/path_gc_p0/test_path_gc_with_publish_version.groovy new file mode 100644 index 00000000000..149c2a20177 --- /dev/null +++ b/regression-test/suites/path_gc_p0/test_path_gc_with_publish_version.groovy @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import org.apache.doris.regression.util.NodeType + +suite('test_path_gc_with_publish_version', 'docker') { + if (isCloudMode()) { + return + } + + def options = new ClusterOptions() + options.enableDebugPoints() + options.beConfigs += [ + 'path_gc_check=true', + 'path_gc_check_interval_second=1', + 'path_gc_check_step=0', + 'generate_tablet_meta_checkpoint_tasks_interval_secs=1', + 'tablet_meta_checkpoint_min_new_rowsets_num=1', + 'sys_log_verbose_modules=*', + ] + options.feNum = 1 + options.beNum = 1 + + docker(options) { + def be1 = cluster.getBeByIndex(1) + be1.enableDebugPoint('_path_gc_thread_callback.interval.eq.1ms', null) + be1.enableDebugPoint('_path_gc_thread_callback.always.do', null) + + sql "SET GLOBAL insert_visible_timeout_ms = 5000" + // wait path gc interval time to 1ms + Thread.sleep(1000) + + sql """ + CREATE TABLE tbl (k1 INT, k2 INT) DISTRIBUTED BY HASH(k1) BUCKETS 1 PROPERTIES ( + "replication_allocation" = "tag.location.default: 1") + """ + + def result = sql_return_maparray """show tablets from tbl""" + log.info("show tablet result {}", result) + Long tabletId = result.TabletId[0] as Long + + be1.enableDebugPoint('EnginePublishVersionTask.handle.block_add_rowsets', null) + be1.enableDebugPoint('EnginePublishVersionTask.handle.after_add_inc_rowset_rowsets_block', null) + sql 'INSERT INTO tbl VALUES (1, 10)' + // Rs not in pending + + be1.enableDebugPoint('DataDir::_perform_rowset_gc.simulation.slow', [tablet_id: tabletId]) + Thread.sleep(5000) + + be1.disableDebugPoint('EnginePublishVersionTask.handle.block_add_rowsets') + Thread.sleep(5000) + // publish continue + // checkpoint clean Rs manager + // path gc continue + be1.disableDebugPoint('DataDir::_perform_rowset_gc.simulation.slow') + be1.disableDebugPoint('EnginePublishVersionTask.handle.after_add_inc_rowset_rowsets_block') + Thread.sleep(3 * 1000) + + result = sql """select * from tbl""" + log.info("result = {}", result) + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org