github-actions[bot] commented on code in PR #25921: URL: https://github.com/apache/doris/pull/25921#discussion_r1371962965
########## be/src/olap/data_dir.cpp: ########## @@ -627,74 +626,61 @@ } void DataDir::perform_path_gc() { - std::unique_lock<std::mutex> lck(_check_path_mutex); - _check_path_cv.wait(lck, [this] { - return _stop_bg_worker || !_all_tablet_schemahash_paths.empty() || - !_all_check_paths.empty(); - }); - if (_stop_bg_worker) { - return; - } - - _perform_path_gc_by_tablet(); - _perform_path_gc_by_rowsetid(); + auto tablet_paths = _perform_path_scan(); + _perform_path_gc_by_tablet(tablet_paths); + _perform_path_gc_by_rowset(tablet_paths); } // gc unused tablet schemahash dir -void DataDir::_perform_path_gc_by_tablet() { - if (_all_tablet_schemahash_paths.empty()) { +void DataDir::_perform_path_gc_by_tablet(std::vector<std::string>& tablet_paths) { + if (_stop_bg_worker) return; + if (tablet_paths.empty()) { return; } - LOG(INFO) << "start to path gc by tablet schemahash."; + LOG(INFO) << "start to path gc by tablet, dir=" << _path; + // Use double pointer to avoid move elements after erase a element + auto forward = tablet_paths.begin(); + auto backward = tablet_paths.end(); int counter = 0; - for (const auto& path : _all_tablet_schemahash_paths) { - ++counter; + do { if (config::path_gc_check_step > 0 && counter % config::path_gc_check_step == 0) { std::this_thread::sleep_for( std::chrono::milliseconds(config::path_gc_check_step_interval_ms)); } + auto& path = *forward; TTabletId tablet_id = -1; TSchemaHash schema_hash = -1; bool is_valid = _tablet_manager->get_tablet_id_and_schema_hash_from_path(path, &tablet_id, &schema_hash); - if (!is_valid) { + if (!is_valid || tablet_id < 1 || schema_hash < 1) [[unlikely]] { LOG(WARNING) << "unknown path:" << path; + --backward; + std::swap(*forward, *backward); continue; } - // should not happen, because already check it is a valid tablet schema hash path in previous step - // so that log fatal here - if (tablet_id < 1 || schema_hash < 1) { - LOG(WARNING) << "invalid tablet id " << tablet_id << " or schema hash " << schema_hash - << ", path=" << path; - continue; - } - TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id); - if (tablet != nullptr) { - // could find the tablet, then skip check it - continue; - } - std::string data_dir_path = - io::Path(path).parent_path().parent_path().parent_path().parent_path(); - DataDir* data_dir = StorageEngine::instance()->get_store(data_dir_path); - if (data_dir == nullptr) { - LOG(WARNING) << "could not find data dir for tablet path " << path; + auto tablet = _tablet_manager->get_tablet(tablet_id); + if (!tablet) { + _tablet_manager->try_delete_unused_tablet_path(this, tablet_id, schema_hash, path); + --backward; + std::swap(*forward, *backward); continue; } - _tablet_manager->try_delete_unused_tablet_path(data_dir, tablet_id, schema_hash, path); - } - _all_tablet_schemahash_paths.clear(); - LOG(INFO) << "finished one time path gc by tablet."; + // could find the tablet, then skip check it + ++forward; + } while (forward != backward && !_stop_bg_worker); + tablet_paths.erase(backward, tablet_paths.end()); + LOG(INFO) << "finish path gc by tablet, dir=" << _path; } -void DataDir::_perform_path_gc_by_rowsetid() { - // init the set of valid path - // validate the path in data dir - if (_all_check_paths.empty()) { +void DataDir::_perform_path_gc_by_rowset(const std::vector<std::string>& tablet_paths) { Review Comment: warning: method '_perform_path_gc_by_rowset' can be made static [readability-convert-member-functions-to-static] ```suggestion static void DataDir::_perform_path_gc_by_rowset(const std::vector<std::string>& tablet_paths) { ``` ########## be/src/olap/data_dir.cpp: ########## @@ -627,74 +626,61 @@ } void DataDir::perform_path_gc() { - std::unique_lock<std::mutex> lck(_check_path_mutex); - _check_path_cv.wait(lck, [this] { - return _stop_bg_worker || !_all_tablet_schemahash_paths.empty() || - !_all_check_paths.empty(); - }); - if (_stop_bg_worker) { - return; - } - - _perform_path_gc_by_tablet(); - _perform_path_gc_by_rowsetid(); + auto tablet_paths = _perform_path_scan(); + _perform_path_gc_by_tablet(tablet_paths); + _perform_path_gc_by_rowset(tablet_paths); } // gc unused tablet schemahash dir -void DataDir::_perform_path_gc_by_tablet() { - if (_all_tablet_schemahash_paths.empty()) { +void DataDir::_perform_path_gc_by_tablet(std::vector<std::string>& tablet_paths) { + if (_stop_bg_worker) return; + if (tablet_paths.empty()) { return; } - LOG(INFO) << "start to path gc by tablet schemahash."; + LOG(INFO) << "start to path gc by tablet, dir=" << _path; + // Use double pointer to avoid move elements after erase a element + auto forward = tablet_paths.begin(); + auto backward = tablet_paths.end(); int counter = 0; - for (const auto& path : _all_tablet_schemahash_paths) { - ++counter; + do { if (config::path_gc_check_step > 0 && counter % config::path_gc_check_step == 0) { std::this_thread::sleep_for( std::chrono::milliseconds(config::path_gc_check_step_interval_ms)); } + auto& path = *forward; TTabletId tablet_id = -1; TSchemaHash schema_hash = -1; bool is_valid = _tablet_manager->get_tablet_id_and_schema_hash_from_path(path, &tablet_id, &schema_hash); - if (!is_valid) { + if (!is_valid || tablet_id < 1 || schema_hash < 1) [[unlikely]] { LOG(WARNING) << "unknown path:" << path; + --backward; + std::swap(*forward, *backward); continue; } - // should not happen, because already check it is a valid tablet schema hash path in previous step - // so that log fatal here - if (tablet_id < 1 || schema_hash < 1) { - LOG(WARNING) << "invalid tablet id " << tablet_id << " or schema hash " << schema_hash - << ", path=" << path; - continue; - } - TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id); - if (tablet != nullptr) { - // could find the tablet, then skip check it - continue; - } - std::string data_dir_path = - io::Path(path).parent_path().parent_path().parent_path().parent_path(); - DataDir* data_dir = StorageEngine::instance()->get_store(data_dir_path); - if (data_dir == nullptr) { - LOG(WARNING) << "could not find data dir for tablet path " << path; + auto tablet = _tablet_manager->get_tablet(tablet_id); + if (!tablet) { + _tablet_manager->try_delete_unused_tablet_path(this, tablet_id, schema_hash, path); + --backward; + std::swap(*forward, *backward); continue; } - _tablet_manager->try_delete_unused_tablet_path(data_dir, tablet_id, schema_hash, path); - } - _all_tablet_schemahash_paths.clear(); - LOG(INFO) << "finished one time path gc by tablet."; + // could find the tablet, then skip check it + ++forward; + } while (forward != backward && !_stop_bg_worker); + tablet_paths.erase(backward, tablet_paths.end()); + LOG(INFO) << "finish path gc by tablet, dir=" << _path; } -void DataDir::_perform_path_gc_by_rowsetid() { - // init the set of valid path - // validate the path in data dir - if (_all_check_paths.empty()) { +void DataDir::_perform_path_gc_by_rowset(const std::vector<std::string>& tablet_paths) { + if (_stop_bg_worker) return; Review Comment: warning: statement should be inside braces [readability-braces-around-statements] ```suggestion if (_stop_bg_worker) { return; } ``` ########## be/src/olap/data_dir.cpp: ########## @@ -704,55 +690,112 @@ TSchemaHash schema_hash = -1; bool is_valid = _tablet_manager->get_tablet_id_and_schema_hash_from_path(path, &tablet_id, &schema_hash); - if (!is_valid) { + if (!is_valid || tablet_id < 1 || schema_hash < 1) [[unlikely]] { LOG(WARNING) << "unknown path:" << path; continue; } - if (tablet_id > 0 && schema_hash > 0) { - // tablet schema hash path or rowset file path - // gc thread should get tablet include deleted tablet - // or it will delete rowset file before tablet is garbage collected - RowsetId rowset_id; - bool is_rowset_file = TabletManager::get_rowset_id_from_path(path, &rowset_id); - if (is_rowset_file) { - TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id); - if (tablet != nullptr) { - if (!tablet->check_rowset_id(rowset_id) && - !StorageEngine::instance()->check_rowset_id_in_unused_rowsets(rowset_id)) { - _process_garbage_path(path); - } + bool exists; + std::vector<io::FileInfo> files; + if (_stop_bg_worker) break; + auto st = io::global_local_filesystem()->list(path, true, &files, &exists); + if (!st.ok()) [[unlikely]] { + LOG(WARNING) << "fail to list tablet path " << path << " : " << st; + continue; + } + std::unordered_set<std::string> useful_rowsets; + auto tablet = _tablet_manager->get_tablet(tablet_id); + if (!tablet) { + // Could not found the tablet, maybe it's a dropped tablet, will be reclaimed + // in the next time `_perform_path_gc_by_tablet` + continue; + } + tablet->traverse_rowsets( + [&useful_rowsets](auto& rs) { useful_rowsets.insert(rs->rowset_id().to_string()); }, + true); + std::unordered_map<std::string, bool> checked_rowsets; // rowset_id -> is_garbage + // FIXME(plat1ko): Abstract a common function in `olap_common.h` + auto get_rowset_id = [](std::string_view filename) -> std::string { + if (filename.ends_with(".dat")) { + // filename format: {rowset_id}_{segment_num}.dat + auto end = filename.rfind('_'); + if (end == std::string::npos) { + return ""; + } + return std::string(filename.substr(0, end)); + } + if (filename.ends_with(".idx")) { + // filename format: {rowset_id}_{segment_num}_{index_id}.idx + auto end = filename.find('_'); + if (end == std::string::npos) { + return ""; } + return std::string(filename.substr(0, end)); + } + return ""; + }; + auto reclaim_rowset_file = [](const std::string& path) { + auto st = io::global_local_filesystem()->delete_file(path); + if (!st.ok()) [[unlikely]] { + LOG(WARNING) << "failed to delete garbage rowset file: " << st; + return; + } + LOG(INFO) << "delete garbage path: " << path; // Audit log + }; + auto should_reclaim = [&, this](const std::string& rowset_id) { + return !useful_rowsets.contains(rowset_id) && + !StorageEngine::instance()->pending_local_rowsets().contains(rowset_id) && + !StorageEngine::instance()->check_rowset_id_in_unused_rowsets(rowset_id) && + RowsetMetaManager::exists(get_meta(), tablet->tablet_uid(), rowset_id) + .is<META_KEY_NOT_FOUND>(); + }; + for (auto&& file : files) { + auto rowset_id = get_rowset_id(file.file_name); + if (rowset_id.empty()) { + continue; // Not a rowset + } + auto find_it = checked_rowsets.find(rowset_id); + if (find_it != checked_rowsets.end()) { + if (find_it->second) { + reclaim_rowset_file(path + '/' + file.file_name); + } + continue; + } + // Check rowset useful + if (should_reclaim(rowset_id)) { + reclaim_rowset_file(path + '/' + file.file_name); + checked_rowsets.emplace(std::move(rowset_id), true); + } else { + checked_rowsets.emplace(std::move(rowset_id), false); } } } - _all_check_paths.clear(); - LOG(INFO) << "finished one time path gc by rowsetid."; + LOG(INFO) << "finish path gc by rowset."; } // path producer -Status DataDir::perform_path_scan() { - std::unique_lock<std::mutex> lck(_check_path_mutex); - if (!_all_check_paths.empty()) { - LOG(INFO) << "_all_check_paths is not empty when path scan."; - return Status::OK(); - } - LOG(INFO) << "start to scan data dir path:" << _path; +std::vector<std::string> DataDir::_perform_path_scan() { + std::vector<std::string> tablet_paths; + if (_stop_bg_worker) return tablet_paths; Review Comment: warning: statement should be inside braces [readability-braces-around-statements] ```suggestion if (_stop_bg_worker) { return tablet_paths; } ``` ########## be/src/olap/single_replica_compaction.h: ########## @@ -62,6 +63,9 @@ class SingleReplicaCompaction : public Compaction { CompactionType _compaction_type; DISALLOW_COPY_AND_ASSIGN(SingleReplicaCompaction); + +private: Review Comment: warning: redundant access specifier has the same accessibility as the previous access specifier [readability-redundant-access-specifiers] ```suggestion ``` <details> <summary>Additional context</summary> **be/src/olap/single_replica_compaction.h:47:** previously declared here ```cpp private: ^ ``` </details> ########## be/src/olap/data_dir.cpp: ########## @@ -627,74 +626,61 @@ Status DataDir::load() { } void DataDir::perform_path_gc() { - std::unique_lock<std::mutex> lck(_check_path_mutex); - _check_path_cv.wait(lck, [this] { - return _stop_bg_worker || !_all_tablet_schemahash_paths.empty() || - !_all_check_paths.empty(); - }); - if (_stop_bg_worker) { - return; - } - - _perform_path_gc_by_tablet(); - _perform_path_gc_by_rowsetid(); + auto tablet_paths = _perform_path_scan(); + _perform_path_gc_by_tablet(tablet_paths); + _perform_path_gc_by_rowset(tablet_paths); } // gc unused tablet schemahash dir -void DataDir::_perform_path_gc_by_tablet() { - if (_all_tablet_schemahash_paths.empty()) { +void DataDir::_perform_path_gc_by_tablet(std::vector<std::string>& tablet_paths) { + if (_stop_bg_worker) return; Review Comment: warning: statement should be inside braces [readability-braces-around-statements] ```suggestion if (_stop_bg_worker) { return; } ``` ########## be/src/olap/rowset/rowset_meta_manager.cpp: ########## @@ -47,7 +47,12 @@ bool RowsetMetaManager::check_rowset_meta(OlapMeta* meta, TabletUid tablet_uid, } Status RowsetMetaManager::exists(OlapMeta* meta, TabletUid tablet_uid, const RowsetId& rowset_id) { - std::string key = ROWSET_PREFIX + tablet_uid.to_string() + "_" + rowset_id.to_string(); + return exists(meta, tablet_uid, rowset_id.to_string()); +} + +Status RowsetMetaManager::exists(OlapMeta* meta, TabletUid tablet_uid, Review Comment: warning: method 'exists' can be made static [readability-convert-member-functions-to-static] ```suggestion static Status RowsetMetaManager::exists(OlapMeta* meta, TabletUid tablet_uid, ``` ########## be/src/olap/storage_engine.cpp: ########## @@ -1245,9 +1244,9 @@ Status StorageEngine::execute_task(EngineTask* task) { } // check whether any unused rowsets's id equal to rowset_id -bool StorageEngine::check_rowset_id_in_unused_rowsets(const RowsetId& rowset_id) { +bool StorageEngine::check_rowset_id_in_unused_rowsets(const std::string& rowset_id) { Review Comment: warning: method 'check_rowset_id_in_unused_rowsets' can be made static [readability-convert-member-functions-to-static] ```suggestion s id equal to rowset_idstatic ``` ########## be/src/olap/tablet.h: ########## @@ -516,11 +514,16 @@ class Tablet final : public BaseTablet { RowsetSharedPtr get_rowset(const RowsetId& rowset_id); - void traverse_rowsets(std::function<void(const RowsetSharedPtr&)> visitor) { + void traverse_rowsets(std::function<void(const RowsetSharedPtr&)> visitor, Review Comment: warning: method 'traverse_rowsets' can be made static [readability-convert-member-functions-to-static] ```suggestion static void traverse_rowsets(std::function<void(const RowsetSharedPtr&)> visitor, ``` ########## be/src/olap/txn_manager.cpp: ########## @@ -62,6 +62,36 @@ using std::vector; namespace doris { using namespace ErrorCode; +struct TabletTxnInfo { + PUniqueId load_id; + RowsetSharedPtr rowset; + PendingRowsetGuard pending_rs_guard; + bool unique_key_merge_on_write {false}; + DeleteBitmapPtr delete_bitmap; + // records rowsets calc in commit txn + RowsetIdUnorderedSet rowset_ids; + int64_t creation_time; + bool ingest {false}; + std::shared_ptr<PartialUpdateInfo> partial_update_info; + + TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset) + : load_id(load_id), rowset(rowset), creation_time(UnixSeconds()) {} + + TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset, bool ingest_arg) + : load_id(load_id), rowset(rowset), creation_time(UnixSeconds()), ingest(ingest_arg) {} + + TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset, bool merge_on_write, + DeleteBitmapPtr delete_bitmap, const RowsetIdUnorderedSet& ids) + : load_id(load_id), + rowset(rowset), + unique_key_merge_on_write(merge_on_write), + delete_bitmap(delete_bitmap), + rowset_ids(ids), + creation_time(UnixSeconds()) {} + + TabletTxnInfo() {} Review Comment: warning: use '= default' to define a trivial default constructor [modernize-use-equals-default] ```suggestion TabletTxnInfo() = default; ``` ########## be/src/olap/tablet.h: ########## @@ -516,11 +514,16 @@ RowsetSharedPtr get_rowset(const RowsetId& rowset_id); - void traverse_rowsets(std::function<void(const RowsetSharedPtr&)> visitor) { + void traverse_rowsets(std::function<void(const RowsetSharedPtr&)> visitor, + bool include_stale = false) { std::shared_lock rlock(_meta_lock); for (auto& [v, rs] : _rs_version_map) { visitor(rs); } + if (!include_stale) return; Review Comment: warning: statement should be inside braces [readability-braces-around-statements] ```suggestion if (!include_stale) { return; } ``` ########## be/test/olap/txn_manager_test.cpp: ########## @@ -100,6 +100,14 @@ class TxnManagerTest : public testing::Test { _schema->init_from_pb(tablet_schema_pb); } + void create_tablet() { Review Comment: warning: method 'create_tablet' can be made static [readability-convert-member-functions-to-static] ```suggestion static void create_tablet() { ``` ########## be/test/olap/txn_manager_test.cpp: ########## @@ -194,137 +202,153 @@ // 2. commit txn // 3. should be success TEST_F(TxnManagerTest, CommitTxnWithPrepare) { - Status status = - _txn_mgr->prepare_txn(partition_id, transaction_id, tablet_id, _tablet_uid, load_id); - static_cast<void>(_txn_mgr->commit_txn(_meta, partition_id, transaction_id, tablet_id, - _tablet_uid, load_id, _rowset, false)); - EXPECT_TRUE(status == Status::OK()); + auto st = _txn_mgr->prepare_txn(partition_id, transaction_id, tablet_id, _tablet_uid, load_id); + ASSERT_TRUE(st.ok()) << st; + auto rowset_id = _rowset->rowset_id().to_string(); + PendingRowsetGuard guard {rowset_id}; + st = _txn_mgr->commit_txn(_meta, partition_id, transaction_id, tablet_id, _tablet_uid, load_id, + _rowset, std::move(guard), false); + ASSERT_TRUE(st.ok()) << st; RowsetMetaSharedPtr rowset_meta(new RowsetMeta()); - status = RowsetMetaManager::get_rowset_meta(_meta, _tablet_uid, _rowset->rowset_id(), - rowset_meta); - EXPECT_TRUE(status == Status::OK()); - EXPECT_TRUE(rowset_meta->rowset_id() == _rowset->rowset_id()); + st = RowsetMetaManager::get_rowset_meta(_meta, _tablet_uid, _rowset->rowset_id(), rowset_meta); + ASSERT_TRUE(st.ok()) << st; + EXPECT_EQ(rowset_meta->rowset_id(), _rowset->rowset_id()); + EXPECT_TRUE(k_engine->pending_local_rowsets().contains(rowset_id)); } // 1. commit without prepare // 2. should success TEST_F(TxnManagerTest, CommitTxnWithNoPrepare) { - Status status = _txn_mgr->commit_txn(_meta, partition_id, transaction_id, tablet_id, - _tablet_uid, load_id, _rowset, false); - EXPECT_TRUE(status == Status::OK()); + auto rowset_id = _rowset->rowset_id().to_string(); + PendingRowsetGuard guard {rowset_id}; + auto st = _txn_mgr->commit_txn(_meta, partition_id, transaction_id, tablet_id, _tablet_uid, + load_id, _rowset, std::move(guard), false); + ASSERT_TRUE(st.ok()) << st; + EXPECT_TRUE(k_engine->pending_local_rowsets().contains(rowset_id)); } // 1. commit twice with different rowset id // 2. should failed TEST_F(TxnManagerTest, CommitTxnTwiceWithDiffRowsetId) { - Status status = _txn_mgr->commit_txn(_meta, partition_id, transaction_id, tablet_id, - _tablet_uid, load_id, _rowset, false); - EXPECT_TRUE(status == Status::OK()); - status = _txn_mgr->commit_txn(_meta, partition_id, transaction_id, tablet_id, _tablet_uid, - load_id, _rowset_diff_id, false); - EXPECT_TRUE(status != Status::OK()); + auto rowset_id1 = _rowset->rowset_id().to_string(); + PendingRowsetGuard guard1 {rowset_id1}; + auto st = _txn_mgr->commit_txn(_meta, partition_id, transaction_id, tablet_id, _tablet_uid, + load_id, _rowset, std::move(guard1), false); + ASSERT_TRUE(st.ok()) << st; + auto rowset_id2 = _rowset_diff_id->rowset_id().to_string(); + PendingRowsetGuard guard2 {rowset_id2}; + st = _txn_mgr->commit_txn(_meta, partition_id, transaction_id, tablet_id, _tablet_uid, load_id, + _rowset_diff_id, std::move(guard2), false); + ASSERT_FALSE(st.ok()) << st; + EXPECT_TRUE(k_engine->pending_local_rowsets().contains(rowset_id1)); + EXPECT_FALSE(k_engine->pending_local_rowsets().contains(rowset_id2)); } // 1. commit twice with same rowset id // 2. should success TEST_F(TxnManagerTest, CommitTxnTwiceWithSameRowsetId) { - Status status = _txn_mgr->commit_txn(_meta, partition_id, transaction_id, tablet_id, - _tablet_uid, load_id, _rowset, false); - EXPECT_TRUE(status == Status::OK()); - status = _txn_mgr->commit_txn(_meta, partition_id, transaction_id, tablet_id, _tablet_uid, - load_id, _rowset_same_id, false); - EXPECT_TRUE(status == Status::OK()); + auto rowset_id = _rowset->rowset_id().to_string(); + PendingRowsetGuard guard1 {rowset_id}; + auto st = _txn_mgr->commit_txn(_meta, partition_id, transaction_id, tablet_id, _tablet_uid, + load_id, _rowset, std::move(guard1), false); + ASSERT_TRUE(st.ok()) << st; + PendingRowsetGuard guard2 {rowset_id}; + st = _txn_mgr->commit_txn(_meta, partition_id, transaction_id, tablet_id, _tablet_uid, load_id, + _rowset_same_id, std::move(guard2), false); + ASSERT_TRUE(st.ok()) << st; + EXPECT_TRUE(k_engine->pending_local_rowsets().contains(rowset_id)); } // 1. prepare twice should be success TEST_F(TxnManagerTest, PrepareNewTxnTwice) { - Status status = - _txn_mgr->prepare_txn(partition_id, transaction_id, tablet_id, _tablet_uid, load_id); - EXPECT_TRUE(status == Status::OK()); - status = _txn_mgr->prepare_txn(partition_id, transaction_id, tablet_id, _tablet_uid, load_id); - EXPECT_TRUE(status == Status::OK()); + auto st = _txn_mgr->prepare_txn(partition_id, transaction_id, tablet_id, _tablet_uid, load_id); + ASSERT_TRUE(st.ok()) << st; + st = _txn_mgr->prepare_txn(partition_id, transaction_id, tablet_id, _tablet_uid, load_id); + ASSERT_TRUE(st.ok()) << st; } // 1. txn could be rollbacked if it is not committed TEST_F(TxnManagerTest, RollbackNotCommittedTxn) { - Status status = - _txn_mgr->prepare_txn(partition_id, transaction_id, tablet_id, _tablet_uid, load_id); - EXPECT_TRUE(status == Status::OK()); - status = _txn_mgr->rollback_txn(partition_id, transaction_id, tablet_id, _tablet_uid); - EXPECT_TRUE(status == Status::OK()); + auto st = _txn_mgr->prepare_txn(partition_id, transaction_id, tablet_id, _tablet_uid, load_id); + ASSERT_TRUE(st.ok()) << st; + st = _txn_mgr->rollback_txn(partition_id, transaction_id, tablet_id, _tablet_uid); + ASSERT_TRUE(st.ok()) << st; RowsetMetaSharedPtr rowset_meta(new RowsetMeta()); - status = RowsetMetaManager::get_rowset_meta(_meta, _tablet_uid, _rowset->rowset_id(), - rowset_meta); - EXPECT_TRUE(status != Status::OK()); + st = RowsetMetaManager::get_rowset_meta(_meta, _tablet_uid, _rowset->rowset_id(), rowset_meta); + ASSERT_FALSE(st.ok()) << st; } // 1. txn could not be rollbacked if it is committed TEST_F(TxnManagerTest, RollbackCommittedTxn) { - Status status = _txn_mgr->commit_txn(_meta, partition_id, transaction_id, tablet_id, - _tablet_uid, load_id, _rowset, false); - EXPECT_TRUE(status == Status::OK()); - status = _txn_mgr->rollback_txn(partition_id, transaction_id, tablet_id, _tablet_uid); - EXPECT_FALSE(status == Status::OK()); + auto rowset_id = _rowset->rowset_id().to_string(); + PendingRowsetGuard guard {rowset_id}; + auto st = _txn_mgr->commit_txn(_meta, partition_id, transaction_id, tablet_id, _tablet_uid, + load_id, _rowset, std::move(guard), false); + ASSERT_TRUE(st.ok()) << st; + st = _txn_mgr->rollback_txn(partition_id, transaction_id, tablet_id, _tablet_uid); + ASSERT_FALSE(st.ok()) << st; RowsetMetaSharedPtr rowset_meta(new RowsetMeta()); - status = RowsetMetaManager::get_rowset_meta(_meta, _tablet_uid, _rowset->rowset_id(), - rowset_meta); - EXPECT_TRUE(status == Status::OK()); - EXPECT_TRUE(rowset_meta->rowset_id() == _rowset->rowset_id()); + st = RowsetMetaManager::get_rowset_meta(_meta, _tablet_uid, _rowset->rowset_id(), rowset_meta); + ASSERT_TRUE(st.ok()) << st; + EXPECT_EQ(rowset_meta->rowset_id(), _rowset->rowset_id()); + EXPECT_TRUE(k_engine->pending_local_rowsets().contains(rowset_id)); } // 1. publish version success TEST_F(TxnManagerTest, PublishVersionSuccessful) { - Status status = _txn_mgr->commit_txn(_meta, partition_id, transaction_id, tablet_id, - _tablet_uid, load_id, _rowset, false); - EXPECT_TRUE(status == Status::OK()); + auto rowset_id = _rowset->rowset_id().to_string(); + PendingRowsetGuard guard {rowset_id}; + auto st = _txn_mgr->commit_txn(_meta, partition_id, transaction_id, tablet_id, _tablet_uid, + load_id, _rowset, std::move(guard), false); + ASSERT_TRUE(st.ok()) << st; Version new_version(10, 11); TabletPublishStatistics stats; - status = _txn_mgr->publish_txn(_meta, partition_id, transaction_id, tablet_id, _tablet_uid, - new_version, &stats); - EXPECT_TRUE(status == Status::OK()); + st = _txn_mgr->publish_txn(_meta, partition_id, transaction_id, tablet_id, _tablet_uid, + new_version, &stats); + ASSERT_TRUE(st.ok()) << st; RowsetMetaSharedPtr rowset_meta(new RowsetMeta()); - status = RowsetMetaManager::get_rowset_meta(_meta, _tablet_uid, _rowset->rowset_id(), - rowset_meta); - EXPECT_TRUE(status == Status::OK()); - EXPECT_TRUE(rowset_meta->rowset_id() == _rowset->rowset_id()); - // FIXME(Drogon): these is wrong when not real tablet exist - // EXPECT_EQ(rowset_meta->start_version(), 10); - // EXPECT_EQ(rowset_meta->end_version(), 11); + st = RowsetMetaManager::get_rowset_meta(_meta, _tablet_uid, _rowset->rowset_id(), rowset_meta); + ASSERT_TRUE(st.ok()) << st; + EXPECT_EQ(rowset_meta->rowset_id(), _rowset->rowset_id()); + EXPECT_FALSE(k_engine->pending_local_rowsets().contains(rowset_id)); + EXPECT_EQ(rowset_meta->start_version(), 10); + EXPECT_EQ(rowset_meta->end_version(), 11); Review Comment: warning: 11 is a magic number; consider replacing it with a named constant [readability-magic-numbers] ```cpp EXPECT_EQ(rowset_meta->end_version(), 11); ^ ``` ########## be/test/olap/txn_manager_test.cpp: ########## @@ -194,137 +202,153 @@ // 2. commit txn // 3. should be success TEST_F(TxnManagerTest, CommitTxnWithPrepare) { - Status status = - _txn_mgr->prepare_txn(partition_id, transaction_id, tablet_id, _tablet_uid, load_id); - static_cast<void>(_txn_mgr->commit_txn(_meta, partition_id, transaction_id, tablet_id, - _tablet_uid, load_id, _rowset, false)); - EXPECT_TRUE(status == Status::OK()); + auto st = _txn_mgr->prepare_txn(partition_id, transaction_id, tablet_id, _tablet_uid, load_id); + ASSERT_TRUE(st.ok()) << st; + auto rowset_id = _rowset->rowset_id().to_string(); + PendingRowsetGuard guard {rowset_id}; + st = _txn_mgr->commit_txn(_meta, partition_id, transaction_id, tablet_id, _tablet_uid, load_id, + _rowset, std::move(guard), false); + ASSERT_TRUE(st.ok()) << st; RowsetMetaSharedPtr rowset_meta(new RowsetMeta()); - status = RowsetMetaManager::get_rowset_meta(_meta, _tablet_uid, _rowset->rowset_id(), - rowset_meta); - EXPECT_TRUE(status == Status::OK()); - EXPECT_TRUE(rowset_meta->rowset_id() == _rowset->rowset_id()); + st = RowsetMetaManager::get_rowset_meta(_meta, _tablet_uid, _rowset->rowset_id(), rowset_meta); + ASSERT_TRUE(st.ok()) << st; + EXPECT_EQ(rowset_meta->rowset_id(), _rowset->rowset_id()); + EXPECT_TRUE(k_engine->pending_local_rowsets().contains(rowset_id)); } // 1. commit without prepare // 2. should success TEST_F(TxnManagerTest, CommitTxnWithNoPrepare) { - Status status = _txn_mgr->commit_txn(_meta, partition_id, transaction_id, tablet_id, - _tablet_uid, load_id, _rowset, false); - EXPECT_TRUE(status == Status::OK()); + auto rowset_id = _rowset->rowset_id().to_string(); + PendingRowsetGuard guard {rowset_id}; + auto st = _txn_mgr->commit_txn(_meta, partition_id, transaction_id, tablet_id, _tablet_uid, + load_id, _rowset, std::move(guard), false); + ASSERT_TRUE(st.ok()) << st; + EXPECT_TRUE(k_engine->pending_local_rowsets().contains(rowset_id)); } // 1. commit twice with different rowset id // 2. should failed TEST_F(TxnManagerTest, CommitTxnTwiceWithDiffRowsetId) { - Status status = _txn_mgr->commit_txn(_meta, partition_id, transaction_id, tablet_id, - _tablet_uid, load_id, _rowset, false); - EXPECT_TRUE(status == Status::OK()); - status = _txn_mgr->commit_txn(_meta, partition_id, transaction_id, tablet_id, _tablet_uid, - load_id, _rowset_diff_id, false); - EXPECT_TRUE(status != Status::OK()); + auto rowset_id1 = _rowset->rowset_id().to_string(); + PendingRowsetGuard guard1 {rowset_id1}; + auto st = _txn_mgr->commit_txn(_meta, partition_id, transaction_id, tablet_id, _tablet_uid, + load_id, _rowset, std::move(guard1), false); + ASSERT_TRUE(st.ok()) << st; + auto rowset_id2 = _rowset_diff_id->rowset_id().to_string(); + PendingRowsetGuard guard2 {rowset_id2}; + st = _txn_mgr->commit_txn(_meta, partition_id, transaction_id, tablet_id, _tablet_uid, load_id, + _rowset_diff_id, std::move(guard2), false); + ASSERT_FALSE(st.ok()) << st; + EXPECT_TRUE(k_engine->pending_local_rowsets().contains(rowset_id1)); + EXPECT_FALSE(k_engine->pending_local_rowsets().contains(rowset_id2)); } // 1. commit twice with same rowset id // 2. should success TEST_F(TxnManagerTest, CommitTxnTwiceWithSameRowsetId) { - Status status = _txn_mgr->commit_txn(_meta, partition_id, transaction_id, tablet_id, - _tablet_uid, load_id, _rowset, false); - EXPECT_TRUE(status == Status::OK()); - status = _txn_mgr->commit_txn(_meta, partition_id, transaction_id, tablet_id, _tablet_uid, - load_id, _rowset_same_id, false); - EXPECT_TRUE(status == Status::OK()); + auto rowset_id = _rowset->rowset_id().to_string(); + PendingRowsetGuard guard1 {rowset_id}; + auto st = _txn_mgr->commit_txn(_meta, partition_id, transaction_id, tablet_id, _tablet_uid, + load_id, _rowset, std::move(guard1), false); + ASSERT_TRUE(st.ok()) << st; + PendingRowsetGuard guard2 {rowset_id}; + st = _txn_mgr->commit_txn(_meta, partition_id, transaction_id, tablet_id, _tablet_uid, load_id, + _rowset_same_id, std::move(guard2), false); + ASSERT_TRUE(st.ok()) << st; + EXPECT_TRUE(k_engine->pending_local_rowsets().contains(rowset_id)); } // 1. prepare twice should be success TEST_F(TxnManagerTest, PrepareNewTxnTwice) { - Status status = - _txn_mgr->prepare_txn(partition_id, transaction_id, tablet_id, _tablet_uid, load_id); - EXPECT_TRUE(status == Status::OK()); - status = _txn_mgr->prepare_txn(partition_id, transaction_id, tablet_id, _tablet_uid, load_id); - EXPECT_TRUE(status == Status::OK()); + auto st = _txn_mgr->prepare_txn(partition_id, transaction_id, tablet_id, _tablet_uid, load_id); + ASSERT_TRUE(st.ok()) << st; + st = _txn_mgr->prepare_txn(partition_id, transaction_id, tablet_id, _tablet_uid, load_id); + ASSERT_TRUE(st.ok()) << st; } // 1. txn could be rollbacked if it is not committed TEST_F(TxnManagerTest, RollbackNotCommittedTxn) { - Status status = - _txn_mgr->prepare_txn(partition_id, transaction_id, tablet_id, _tablet_uid, load_id); - EXPECT_TRUE(status == Status::OK()); - status = _txn_mgr->rollback_txn(partition_id, transaction_id, tablet_id, _tablet_uid); - EXPECT_TRUE(status == Status::OK()); + auto st = _txn_mgr->prepare_txn(partition_id, transaction_id, tablet_id, _tablet_uid, load_id); + ASSERT_TRUE(st.ok()) << st; + st = _txn_mgr->rollback_txn(partition_id, transaction_id, tablet_id, _tablet_uid); + ASSERT_TRUE(st.ok()) << st; RowsetMetaSharedPtr rowset_meta(new RowsetMeta()); - status = RowsetMetaManager::get_rowset_meta(_meta, _tablet_uid, _rowset->rowset_id(), - rowset_meta); - EXPECT_TRUE(status != Status::OK()); + st = RowsetMetaManager::get_rowset_meta(_meta, _tablet_uid, _rowset->rowset_id(), rowset_meta); + ASSERT_FALSE(st.ok()) << st; } // 1. txn could not be rollbacked if it is committed TEST_F(TxnManagerTest, RollbackCommittedTxn) { - Status status = _txn_mgr->commit_txn(_meta, partition_id, transaction_id, tablet_id, - _tablet_uid, load_id, _rowset, false); - EXPECT_TRUE(status == Status::OK()); - status = _txn_mgr->rollback_txn(partition_id, transaction_id, tablet_id, _tablet_uid); - EXPECT_FALSE(status == Status::OK()); + auto rowset_id = _rowset->rowset_id().to_string(); + PendingRowsetGuard guard {rowset_id}; + auto st = _txn_mgr->commit_txn(_meta, partition_id, transaction_id, tablet_id, _tablet_uid, + load_id, _rowset, std::move(guard), false); + ASSERT_TRUE(st.ok()) << st; + st = _txn_mgr->rollback_txn(partition_id, transaction_id, tablet_id, _tablet_uid); + ASSERT_FALSE(st.ok()) << st; RowsetMetaSharedPtr rowset_meta(new RowsetMeta()); - status = RowsetMetaManager::get_rowset_meta(_meta, _tablet_uid, _rowset->rowset_id(), - rowset_meta); - EXPECT_TRUE(status == Status::OK()); - EXPECT_TRUE(rowset_meta->rowset_id() == _rowset->rowset_id()); + st = RowsetMetaManager::get_rowset_meta(_meta, _tablet_uid, _rowset->rowset_id(), rowset_meta); + ASSERT_TRUE(st.ok()) << st; + EXPECT_EQ(rowset_meta->rowset_id(), _rowset->rowset_id()); + EXPECT_TRUE(k_engine->pending_local_rowsets().contains(rowset_id)); } // 1. publish version success TEST_F(TxnManagerTest, PublishVersionSuccessful) { - Status status = _txn_mgr->commit_txn(_meta, partition_id, transaction_id, tablet_id, - _tablet_uid, load_id, _rowset, false); - EXPECT_TRUE(status == Status::OK()); + auto rowset_id = _rowset->rowset_id().to_string(); + PendingRowsetGuard guard {rowset_id}; + auto st = _txn_mgr->commit_txn(_meta, partition_id, transaction_id, tablet_id, _tablet_uid, + load_id, _rowset, std::move(guard), false); + ASSERT_TRUE(st.ok()) << st; Version new_version(10, 11); TabletPublishStatistics stats; - status = _txn_mgr->publish_txn(_meta, partition_id, transaction_id, tablet_id, _tablet_uid, - new_version, &stats); - EXPECT_TRUE(status == Status::OK()); + st = _txn_mgr->publish_txn(_meta, partition_id, transaction_id, tablet_id, _tablet_uid, + new_version, &stats); + ASSERT_TRUE(st.ok()) << st; RowsetMetaSharedPtr rowset_meta(new RowsetMeta()); - status = RowsetMetaManager::get_rowset_meta(_meta, _tablet_uid, _rowset->rowset_id(), - rowset_meta); - EXPECT_TRUE(status == Status::OK()); - EXPECT_TRUE(rowset_meta->rowset_id() == _rowset->rowset_id()); - // FIXME(Drogon): these is wrong when not real tablet exist - // EXPECT_EQ(rowset_meta->start_version(), 10); - // EXPECT_EQ(rowset_meta->end_version(), 11); + st = RowsetMetaManager::get_rowset_meta(_meta, _tablet_uid, _rowset->rowset_id(), rowset_meta); + ASSERT_TRUE(st.ok()) << st; + EXPECT_EQ(rowset_meta->rowset_id(), _rowset->rowset_id()); + EXPECT_FALSE(k_engine->pending_local_rowsets().contains(rowset_id)); + EXPECT_EQ(rowset_meta->start_version(), 10); Review Comment: warning: 10 is a magic number; consider replacing it with a named constant [readability-magic-numbers] ```cpp EXPECT_EQ(rowset_meta->start_version(), 10); ^ ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org