This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new ab186a60ce [enhancement](compaction) Optimize judging delete rowset and picking candidate rowsets for compaction #15631 ab186a60ce is described below commit ab186a60ce80d4f168b503cdba1b3ab740067d59 Author: plat1ko <platonekos...@gmail.com> AuthorDate: Tue Jan 10 08:32:15 2023 +0800 [enhancement](compaction) Optimize judging delete rowset and picking candidate rowsets for compaction #15631 Tablet::version_for_delete_predicate should travel all rowset metas in tablet meta which complex is O(N), however we can directly judge whether this rowset is a delete rowset by RowsetMeta::has_delete_predicate which complex is O(1). As we won't call Tablet::version_for_delete_predicate when pick input rowsets for compaction, we can reduce the critical area of Tablet::_meta_lock. --- be/src/olap/base_compaction.cpp | 5 +-- be/src/olap/compaction.cpp | 2 +- be/src/olap/cumulative_compaction.cpp | 5 +-- be/src/olap/cumulative_compaction_policy.cpp | 18 ++------- be/src/olap/cumulative_compaction_policy.h | 9 ----- be/src/olap/tablet.cpp | 39 ++++++++++++------- be/src/olap/tablet.h | 8 +--- be/test/olap/cumulative_compaction_policy_test.cpp | 45 +++++----------------- 8 files changed, 42 insertions(+), 89 deletions(-) diff --git a/be/src/olap/base_compaction.cpp b/be/src/olap/base_compaction.cpp index ea4188788a..36fdb193de 100644 --- a/be/src/olap/base_compaction.cpp +++ b/be/src/olap/base_compaction.cpp @@ -106,10 +106,7 @@ void BaseCompaction::_filter_input_rowset() { } Status BaseCompaction::pick_rowsets_to_compact() { - _input_rowsets.clear(); - std::shared_lock rdlock(_tablet->get_header_lock()); - _tablet->pick_candidate_rowsets_to_base_compaction(&_input_rowsets, rdlock); - std::sort(_input_rowsets.begin(), _input_rowsets.end(), Rowset::comparator); + _input_rowsets = _tablet->pick_candidate_rowsets_to_base_compaction(); RETURN_NOT_OK(check_version_continuity(_input_rowsets)); RETURN_NOT_OK(_check_rowset_overlapping(_input_rowsets)); _filter_input_rowset(); diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index 2089461e12..e2c7473f5f 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -202,7 +202,7 @@ bool Compaction::handle_ordered_data_compaction() { // has a delete version, use original compaction if (compaction_type() == ReaderType::READER_BASE_COMPACTION) { for (auto rowset : _input_rowsets) { - if (_tablet->version_for_delete_predicate(rowset->version())) { + if (rowset->rowset_meta()->has_delete_predicate()) { return false; } } diff --git a/be/src/olap/cumulative_compaction.cpp b/be/src/olap/cumulative_compaction.cpp index 6feb8c5c1c..765790859c 100644 --- a/be/src/olap/cumulative_compaction.cpp +++ b/be/src/olap/cumulative_compaction.cpp @@ -96,10 +96,7 @@ Status CumulativeCompaction::execute_compact_impl() { } Status CumulativeCompaction::pick_rowsets_to_compact() { - std::vector<RowsetSharedPtr> candidate_rowsets; - std::shared_lock rdlock(_tablet->get_header_lock()); - _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets, rdlock); - + auto candidate_rowsets = _tablet->pick_candidate_rowsets_to_cumulative_compaction(); if (candidate_rowsets.empty()) { return Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>(); } diff --git a/be/src/olap/cumulative_compaction_policy.cpp b/be/src/olap/cumulative_compaction_policy.cpp index 3ccbe0eea7..a097ce11b2 100644 --- a/be/src/olap/cumulative_compaction_policy.cpp +++ b/be/src/olap/cumulative_compaction_policy.cpp @@ -85,7 +85,7 @@ void SizeBasedCumulativeCompactionPolicy::calculate_cumulative_point( break; } - bool is_delete = tablet->version_for_delete_predicate(rs->version()); + bool is_delete = rs->has_delete_predicate(); // break the loop if segments in this rowset is overlapping. if (!is_delete && rs->is_segments_overlapping()) { @@ -245,10 +245,9 @@ int SizeBasedCumulativeCompactionPolicy::pick_input_rowsets( int transient_size = 0; *compaction_score = 0; int64_t total_size = 0; - for (size_t i = 0; i < candidate_rowsets.size(); ++i) { - RowsetSharedPtr rowset = candidate_rowsets[i]; + for (auto& rowset : candidate_rowsets) { // check whether this rowset is delete version - if (tablet->version_for_delete_predicate(rowset->version())) { + if (rowset->rowset_meta()->has_delete_predicate()) { *last_delete_version = rowset->version(); if (!input_rowsets->empty()) { // we meet a delete version, and there were other versions before. @@ -344,17 +343,6 @@ int SizeBasedCumulativeCompactionPolicy::_level_size(const int64_t size) { return 0; } -void CumulativeCompactionPolicy::pick_candidate_rowsets( - const std::unordered_map<Version, RowsetSharedPtr, HashOfVersion>& rs_version_map, - int64_t cumulative_point, std::vector<RowsetSharedPtr>* candidate_rowsets) { - for (const auto& [version, rs] : rs_version_map) { - if (version.first >= cumulative_point && rs->is_local()) { - candidate_rowsets->push_back(rs); - } - } - std::sort(candidate_rowsets->begin(), candidate_rowsets->end(), Rowset::comparator); -} - std::shared_ptr<CumulativeCompactionPolicy> CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy() { return std::unique_ptr<CumulativeCompactionPolicy>(new SizeBasedCumulativeCompactionPolicy()); diff --git a/be/src/olap/cumulative_compaction_policy.h b/be/src/olap/cumulative_compaction_policy.h index fabd9344c1..16ac7fc549 100644 --- a/be/src/olap/cumulative_compaction_policy.h +++ b/be/src/olap/cumulative_compaction_policy.h @@ -58,15 +58,6 @@ public: Tablet* tablet, TabletState state, const std::vector<RowsetMetaSharedPtr>& all_rowsets, int64_t current_cumulative_point, uint32_t* score) = 0; - /// This function implements the policy which represents how to pick the candidate rowsets for compaction. - /// This base class gives a unified implementation. Its derived classes also can override this function each other. - /// param rs_version_map, mapping from version to rowset - /// param cumulative_point, current cumulative point of tablet - /// return candidate_rowsets, the container of candidate rowsets - virtual void pick_candidate_rowsets( - const std::unordered_map<Version, RowsetSharedPtr, HashOfVersion>& rs_version_map, - int64_t cumulative_point, std::vector<RowsetSharedPtr>* candidate_rowsets); - /// Pick input rowsets from candidate rowsets for compaction. This function is pure virtual function. /// Its implementation depends on concrete compaction policy. /// param candidate_rowsets, the candidate_rowsets vector container to pick input rowsets diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index dae86a06e8..a67ace80a5 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -1125,25 +1125,36 @@ TabletInfo Tablet::get_tablet_info() const { return TabletInfo(tablet_id(), schema_hash(), tablet_uid()); } -void Tablet::pick_candidate_rowsets_to_cumulative_compaction( - std::vector<RowsetSharedPtr>* candidate_rowsets, - std::shared_lock<std::shared_mutex>& /* meta lock*/) { +std::vector<RowsetSharedPtr> Tablet::pick_candidate_rowsets_to_cumulative_compaction() { + std::vector<RowsetSharedPtr> candidate_rowsets; if (_cumulative_point == K_INVALID_CUMULATIVE_POINT) { - return; + return candidate_rowsets; + } + { + std::shared_lock rlock(_meta_lock); + for (const auto& [version, rs] : _rs_version_map) { + if (version.first >= _cumulative_point && rs->is_local()) { + candidate_rowsets.push_back(rs); + } + } } - _cumulative_compaction_policy->pick_candidate_rowsets(_rs_version_map, _cumulative_point, - candidate_rowsets); + std::sort(candidate_rowsets.begin(), candidate_rowsets.end(), Rowset::comparator); + return candidate_rowsets; } -void Tablet::pick_candidate_rowsets_to_base_compaction( - vector<RowsetSharedPtr>* candidate_rowsets, - std::shared_lock<std::shared_mutex>& /* meta lock*/) { - for (auto& it : _rs_version_map) { - // Do compaction on local rowsets only. - if (it.first.first < _cumulative_point && it.second->is_local()) { - candidate_rowsets->push_back(it.second); +std::vector<RowsetSharedPtr> Tablet::pick_candidate_rowsets_to_base_compaction() { + std::vector<RowsetSharedPtr> candidate_rowsets; + { + std::shared_lock rlock(_meta_lock); + for (const auto& [version, rs] : _rs_version_map) { + // Do compaction on local rowsets only. + if (version.first < _cumulative_point && rs->is_local()) { + candidate_rowsets.push_back(rs); + } } } + std::sort(candidate_rowsets.begin(), candidate_rowsets.end(), Rowset::comparator); + return candidate_rowsets; } // For http compaction action @@ -1173,7 +1184,7 @@ void Tablet::get_compaction_status(std::string* json_result) { delete_flags.reserve(rowsets.size()); for (auto& rs : rowsets) { - delete_flags.push_back(version_for_delete_predicate(rs->version())); + delete_flags.push_back(rs->rowset_meta()->has_delete_predicate()); } // get snapshot version path json_doc _timestamped_version_tracker.get_stale_version_path_json_doc(path_arr); diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 3adfdeee2a..c82fa461a1 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -225,12 +225,8 @@ public: TabletInfo get_tablet_info() const; - void pick_candidate_rowsets_to_cumulative_compaction( - std::vector<RowsetSharedPtr>* candidate_rowsets, - std::shared_lock<std::shared_mutex>& /* meta lock*/); - void pick_candidate_rowsets_to_base_compaction( - std::vector<RowsetSharedPtr>* candidate_rowsets, - std::shared_lock<std::shared_mutex>& /* meta lock*/); + std::vector<RowsetSharedPtr> pick_candidate_rowsets_to_cumulative_compaction(); + std::vector<RowsetSharedPtr> pick_candidate_rowsets_to_base_compaction(); void calculate_cumulative_point(); // TODO(ygl): diff --git a/be/test/olap/cumulative_compaction_policy_test.cpp b/be/test/olap/cumulative_compaction_policy_test.cpp index 5be78e9190..f4760b9bf0 100644 --- a/be/test/olap/cumulative_compaction_policy_test.cpp +++ b/be/test/olap/cumulative_compaction_policy_test.cpp @@ -400,10 +400,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_candidate_rowsets) { _tablet->init(); _tablet->calculate_cumulative_point(); - std::vector<RowsetSharedPtr> candidate_rowsets; - std::shared_lock rdlock(_tablet->get_header_lock()); - _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets, rdlock); - + auto candidate_rowsets = _tablet->pick_candidate_rowsets_to_cumulative_compaction(); EXPECT_EQ(3, candidate_rowsets.size()); } @@ -419,10 +416,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_candidate_rowsets_big_base) _tablet->init(); _tablet->calculate_cumulative_point(); - std::vector<RowsetSharedPtr> candidate_rowsets; - std::shared_lock rdlock(_tablet->get_header_lock()); - _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets, rdlock); - + auto candidate_rowsets = _tablet->pick_candidate_rowsets_to_cumulative_compaction(); EXPECT_EQ(3, candidate_rowsets.size()); } @@ -438,10 +432,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_normal) { _tablet->init(); _tablet->calculate_cumulative_point(); - std::vector<RowsetSharedPtr> candidate_rowsets; - - std::shared_lock rdlock(_tablet->get_header_lock()); - _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets, rdlock); + auto candidate_rowsets = _tablet->pick_candidate_rowsets_to_cumulative_compaction(); std::vector<RowsetSharedPtr> input_rowsets; Version last_delete_version {-1, -1}; @@ -468,10 +459,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_big_base) { _tablet->init(); _tablet->calculate_cumulative_point(); - std::vector<RowsetSharedPtr> candidate_rowsets; - - std::shared_lock rdlock(_tablet->get_header_lock()); - _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets, rdlock); + auto candidate_rowsets = _tablet->pick_candidate_rowsets_to_cumulative_compaction(); std::vector<RowsetSharedPtr> input_rowsets; Version last_delete_version {-1, -1}; @@ -498,10 +486,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_promotion) { _tablet->init(); _tablet->calculate_cumulative_point(); - std::vector<RowsetSharedPtr> candidate_rowsets; - - std::shared_lock rdlock(_tablet->get_header_lock()); - _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets, rdlock); + auto candidate_rowsets = _tablet->pick_candidate_rowsets_to_cumulative_compaction(); std::vector<RowsetSharedPtr> input_rowsets; Version last_delete_version {-1, -1}; @@ -528,10 +513,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_not_same_leve _tablet->init(); _tablet->calculate_cumulative_point(); - std::vector<RowsetSharedPtr> candidate_rowsets; - - std::shared_lock rdlock(_tablet->get_header_lock()); - _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets, rdlock); + auto candidate_rowsets = _tablet->pick_candidate_rowsets_to_cumulative_compaction(); std::vector<RowsetSharedPtr> input_rowsets; Version last_delete_version {-1, -1}; @@ -558,10 +540,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_empty) { _tablet->init(); _tablet->calculate_cumulative_point(); - std::vector<RowsetSharedPtr> candidate_rowsets; - - std::shared_lock rdlock(_tablet->get_header_lock()); - _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets, rdlock); + auto candidate_rowsets = _tablet->pick_candidate_rowsets_to_cumulative_compaction(); std::vector<RowsetSharedPtr> input_rowsets; Version last_delete_version {-1, -1}; @@ -588,10 +567,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_not_reach_min _tablet->init(); _tablet->calculate_cumulative_point(); - std::vector<RowsetSharedPtr> candidate_rowsets; - - std::shared_lock rdlock(_tablet->get_header_lock()); - _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets, rdlock); + auto candidate_rowsets = _tablet->pick_candidate_rowsets_to_cumulative_compaction(); std::vector<RowsetSharedPtr> input_rowsets; Version last_delete_version {-1, -1}; @@ -618,10 +594,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_delete) { _tablet->init(); _tablet->calculate_cumulative_point(); - std::vector<RowsetSharedPtr> candidate_rowsets; - - std::shared_lock rdlock(_tablet->get_header_lock()); - _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets, rdlock); + auto candidate_rowsets = _tablet->pick_candidate_rowsets_to_cumulative_compaction(); std::vector<RowsetSharedPtr> input_rowsets; Version last_delete_version {-1, -1}; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org