This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new c8aca786315 [enhancement](compaction) optimize the cpu consumption of the compaction task producer thread (#40152) c8aca786315 is described below commit c8aca786315808d10cbe477e2113da8af10a11c0 Author: Luwei <814383...@qq.com> AuthorDate: Mon Oct 14 09:56:07 2024 +0800 [enhancement](compaction) optimize the cpu consumption of the compaction task producer thread (#40152) Co-authored-by: Yongqiang YANG <98214048+dataroar...@users.noreply.github.com> --- be/src/common/config.cpp | 4 +- be/src/common/config.h | 2 + be/src/olap/compaction.cpp | 1 + be/src/olap/tablet.cpp | 47 +++++- be/src/olap/tablet.h | 25 +++- be/src/olap/tablet_manager.cpp | 19 ++- be/test/olap/compaction_score_test.cpp | 158 +++++++++++++++++++++ be/test/olap/cumulative_compaction_policy_test.cpp | 8 +- ...mulative_compaction_time_series_policy_test.cpp | 12 +- be/test/olap/tablet_mgr_test.cpp | 6 +- 10 files changed, 256 insertions(+), 26 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 5527ab07885..071eac13e81 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -396,7 +396,7 @@ DEFINE_mInt64(base_compaction_max_compaction_score, "20"); DEFINE_mDouble(base_compaction_min_data_ratio, "0.3"); DEFINE_mInt64(base_compaction_dup_key_max_file_size_mbytes, "1024"); -DEFINE_Bool(enable_skip_tablet_compaction, "true"); +DEFINE_Bool(enable_skip_tablet_compaction, "false"); // output rowset of cumulative compaction total disk size exceed this config size, // this rowset will be given to base compaction, unit is m byte. DEFINE_mInt64(compaction_promotion_size_mbytes, "1024"); @@ -1348,6 +1348,8 @@ DEFINE_mInt32(lz4_compression_block_size, "262144"); DEFINE_mBool(enable_pipeline_task_leakage_detect, "false"); +DEFINE_mInt32(check_score_rounds_num, "1000"); + DEFINE_Int32(query_cache_size, "512"); // clang-format off diff --git a/be/src/common/config.h b/be/src/common/config.h index e2789913703..585c4dc45cc 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1433,6 +1433,8 @@ DECLARE_mInt32(lz4_compression_block_size); DECLARE_mBool(enable_pipeline_task_leakage_detect); +DECLARE_mInt32(check_score_rounds_num); + // MB DECLARE_Int32(query_cache_size); diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index 8b37e9ba174..84830502366 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -1093,6 +1093,7 @@ Status CompactionMixin::modify_rowsets() { LOG(WARNING) << "failed to remove old version delete bitmap, st: " << st; } } + return Status::OK(); } diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 51eabe5495e..1e8518b47dc 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -57,7 +57,6 @@ #include "agent/utils.h" #include "common/config.h" #include "common/consts.h" -#include "common/exception.h" #include "common/logging.h" #include "common/signal_handler.h" #include "common/status.h" @@ -489,6 +488,7 @@ Status Tablet::add_rowset(RowsetSharedPtr rowset) { RETURN_IF_ERROR(_tablet_meta->add_rs_meta(rowset->rowset_meta())); _rs_version_map[rowset->version()] = rowset; _timestamped_version_tracker.add_version(rowset->version()); + add_compaction_score(rowset->rowset_meta()->get_compaction_score()); std::vector<RowsetSharedPtr> rowsets_to_delete; // yiguolei: temp code, should remove the rowset contains by this rowset @@ -594,6 +594,17 @@ Status Tablet::modify_rowsets(std::vector<RowsetSharedPtr>& to_add, } } } + + int32_t add_score = 0; + for (auto rs : to_add) { + add_score += rs->rowset_meta()->get_compaction_score(); + } + int32_t sub_score = 0; + for (auto rs : to_delete) { + sub_score += rs->rowset_meta()->get_compaction_score(); + } + add_compaction_score(add_score - sub_score); + return Status::OK(); } @@ -668,6 +679,9 @@ Status Tablet::add_inc_rowset(const RowsetSharedPtr& rowset) { _timestamped_version_tracker.add_version(rowset->version()); ++_newly_created_rowset_num; + + add_compaction_score(rowset->rowset_meta()->get_compaction_score()); + return Status::OK(); } @@ -983,17 +997,41 @@ bool Tablet::can_do_compaction(size_t path_hash, CompactionType compaction_type) return tablet_state() == TABLET_RUNNING || tablet_state() == TABLET_NOTREADY; } -uint32_t Tablet::calc_compaction_score( +uint32_t Tablet::calc_compaction_score() { + if (_score_check_cnt++ % config::check_score_rounds_num != 0) { + std::shared_lock rdlock(_meta_lock); + if (_compaction_score > 0) { + return _compaction_score; + } + } + + { + // Need meta lock, because it will iterator "all_rs_metas" of tablet meta. + std::shared_lock rdlock(_meta_lock); + int32_t score = get_real_compaction_score(); + if (_compaction_score > 0 && _compaction_score != score) { + LOG(WARNING) << "cumu cache score not equal real score, cache score; " + << _compaction_score << ", real score: " << score + << ", tablet: " << tablet_id(); + } + _compaction_score = score; + return score; + } +} + +bool Tablet::suitable_for_compaction( CompactionType compaction_type, std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy) { // Need meta lock, because it will iterator "all_rs_metas" of tablet meta. std::shared_lock rdlock(_meta_lock); + int32_t score = -1; if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) { - return _calc_cumulative_compaction_score(cumulative_compaction_policy); + score = _calc_cumulative_compaction_score(cumulative_compaction_policy); } else { DCHECK_EQ(compaction_type, CompactionType::BASE_COMPACTION); - return _calc_base_compaction_score(); + score = _calc_base_compaction_score(); } + return score > 0; } uint32_t Tablet::calc_cold_data_compaction_score() const { @@ -1790,6 +1828,7 @@ void Tablet::execute_compaction(CompactionMixin& compaction) { watch.start(); Status res = [&]() { RETURN_IF_CATCH_EXCEPTION({ return compaction.execute_compact(); }); }(); + if (!res.ok()) [[unlikely]] { set_last_failure_time(this, compaction, UnixMillis()); LOG(WARNING) << "failed to do " << compaction.compaction_name() diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 33253e82ced..d7d10978d5a 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -221,10 +221,12 @@ public: // operation for compaction bool can_do_compaction(size_t path_hash, CompactionType compaction_type); - uint32_t calc_compaction_score( + bool suitable_for_compaction( CompactionType compaction_type, std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy); + uint32_t calc_compaction_score(); + // This function to find max continuous version from the beginning. // For example: If there are 1, 2, 3, 5, 6, 7 versions belongs tablet, then 3 is target. // 3 will be saved in "version", and 7 will be saved in "max_version", if max_version != nullptr @@ -482,6 +484,24 @@ public: inline bool is_full_compaction_running() const { return _is_full_compaction_running; } void clear_cache() override; + int32_t get_compaction_score() const { return _compaction_score; } + + void set_compaction_score(int32_t compaction_score) { _compaction_score = compaction_score; } + + void add_compaction_score(int32_t score) { + if (_compaction_score < 0) { + return; + } + _compaction_score += score; + } + + void minus_compaction_score(int32_t score) { + if (_compaction_score < 0) { + return; + } + _compaction_score -= score; + } + private: Status _init_once_action(); bool _contains_rowset(const RowsetId rowset_id); @@ -608,6 +628,9 @@ private: std::shared_ptr<const VersionWithTime> _visible_version; std::atomic_bool _is_full_compaction_running = false; + + int32_t _compaction_score = -1; + int32_t _score_check_cnt = 0; }; inline CumulativeCompactionPolicy* Tablet::cumulative_compaction_policy() { diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp index 468a6b2fb12..64eb408c9e3 100644 --- a/be/src/olap/tablet_manager.cpp +++ b/be/src/olap/tablet_manager.cpp @@ -797,8 +797,7 @@ std::vector<TabletSharedPtr> TabletManager::find_best_tablets_to_compaction( } auto cumulative_compaction_policy = all_cumulative_compaction_policies.at( tablet_ptr->tablet_meta()->compaction_policy()); - uint32_t current_compaction_score = - tablet_ptr->calc_compaction_score(compaction_type, cumulative_compaction_policy); + uint32_t current_compaction_score = tablet_ptr->calc_compaction_score(); if (current_compaction_score < 5) { tablet_ptr->set_skip_compaction(true, compaction_type, UnixSeconds()); } @@ -806,14 +805,22 @@ std::vector<TabletSharedPtr> TabletManager::find_best_tablets_to_compaction( // tablet should do single compaction if (current_compaction_score > single_compact_highest_score && tablet_ptr->should_fetch_from_peer()) { - single_compact_highest_score = current_compaction_score; - best_single_compact_tablet = tablet_ptr; + bool ret = tablet_ptr->suitable_for_compaction(compaction_type, + cumulative_compaction_policy); + if (ret) { + single_compact_highest_score = current_compaction_score; + best_single_compact_tablet = tablet_ptr; + } } // tablet should do cumu or base compaction if (current_compaction_score > highest_score && !tablet_ptr->should_fetch_from_peer()) { - highest_score = current_compaction_score; - best_tablet = tablet_ptr; + bool ret = tablet_ptr->suitable_for_compaction(compaction_type, + cumulative_compaction_policy); + if (ret) { + highest_score = current_compaction_score; + best_tablet = tablet_ptr; + } } }; diff --git a/be/test/olap/compaction_score_test.cpp b/be/test/olap/compaction_score_test.cpp new file mode 100644 index 00000000000..de4e5cdde0a --- /dev/null +++ b/be/test/olap/compaction_score_test.cpp @@ -0,0 +1,158 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <gmock/gmock-actions.h> +#include <gmock/gmock-matchers.h> +#include <gtest/gtest-message.h> +#include <gtest/gtest-test-part.h> +#include <gtest/gtest.h> + +#include <filesystem> +#include <memory> + +#include "common/status.h" +#include "cpp/sync_point.h" +#include "gtest/gtest_pred_impl.h" +#include "io/fs/local_file_system.h" +#include "olap/cumulative_compaction_policy.h" +#include "olap/data_dir.h" +#include "olap/rowset/rowset_factory.h" +#include "olap/storage_engine.h" +#include "olap/tablet_manager.h" +#include "util/threadpool.h" + +namespace doris { +using namespace config; + +class CompactionScoreTest : public testing::Test { +public: + virtual void SetUp() { + _engine_data_path = "./be/test/olap/test_data/converter_test_data/tmp"; + auto st = io::global_local_filesystem()->delete_directory(_engine_data_path); + ASSERT_TRUE(st.ok()) << st; + st = io::global_local_filesystem()->create_directory(_engine_data_path); + ASSERT_TRUE(st.ok()) << st; + EXPECT_TRUE( + io::global_local_filesystem()->create_directory(_engine_data_path + "/meta").ok()); + + EngineOptions options; + options.backend_uid = UniqueId::gen_uid(); + _storage_engine = std::make_unique<StorageEngine>(options); + _data_dir = std::make_unique<DataDir>(*_storage_engine, _engine_data_path, 100000000); + static_cast<void>(_data_dir->init()); + } + + virtual void TearDown() { + EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_engine_data_path).ok()); + ExecEnv::GetInstance()->set_storage_engine(nullptr); + } + + RowsetSharedPtr create_rowset(Version version, int num_segments, bool overlapping, + int data_size) { + auto rs_meta = std::make_shared<RowsetMeta>(); + rs_meta->set_rowset_type(BETA_ROWSET); // important + rs_meta->_rowset_meta_pb.set_start_version(version.first); + rs_meta->_rowset_meta_pb.set_end_version(version.second); + rs_meta->set_rowset_id(_storage_engine->next_rowset_id()); + rs_meta->set_num_segments(num_segments); + rs_meta->set_segments_overlap(overlapping ? OVERLAPPING : NONOVERLAPPING); + rs_meta->set_total_disk_size(data_size); + RowsetSharedPtr rowset; + Status st = RowsetFactory::create_rowset(nullptr, "", std::move(rs_meta), &rowset); + if (!st.ok()) { + return nullptr; + } + return rowset; + } + + std::unique_ptr<StorageEngine> _storage_engine; + std::string _engine_data_path; + std::unique_ptr<DataDir> _data_dir; +}; + +TEST_F(CompactionScoreTest, TestCompactionScore) { + /* + auto st = ThreadPoolBuilder("BaseCompactionTaskThreadPool") + .set_min_threads(2) + .set_max_threads(2) + .build(&_storage_engine->_base_compaction_thread_pool); + EXPECT_TRUE(st.OK()); + st = ThreadPoolBuilder("CumuCompactionTaskThreadPool") + .set_min_threads(2) + .set_max_threads(2) + .build(&_storage_engine->_cumu_compaction_thread_pool); + EXPECT_TRUE(st.OK()); + */ + + /* + auto* sp = SyncPoint::get_instance(); + sp->enable_processing(); + sp->set_call_back("olap_server::execute_compaction", [](auto&& values) { + std::this_thread::sleep_for(std::chrono::seconds(10)); + bool* pred = try_any_cast<bool*>(values.back()); + *pred = true; + }); + */ + + TabletMetaSharedPtr tablet_meta; + tablet_meta.reset(new TabletMeta(1, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, {{7, 8}}, + UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK, + TCompressionType::LZ4F)); + TabletSharedPtr tablet(new Tablet(*(_storage_engine.get()), tablet_meta, _data_dir.get(), + CUMULATIVE_SIZE_BASED_POLICY)); + Status st = tablet->init(); + EXPECT_TRUE(st.OK()); + + for (int i = 2; i < 10; ++i) { + RowsetSharedPtr rs = create_rowset({i, i}, 1, false, 102400); + st = tablet->add_inc_rowset(rs); + EXPECT_TRUE(st.OK()); + } + EXPECT_EQ(tablet->get_compaction_score(), -1); + EXPECT_EQ(tablet->calc_compaction_score(), 8); + EXPECT_EQ(tablet->get_real_compaction_score(), 8); + + for (int i = 10; i < 30; ++i) { + RowsetSharedPtr rs = create_rowset({i, i}, 1, false, 102400); + st = tablet->add_inc_rowset(rs); + EXPECT_TRUE(st.OK()); + } + EXPECT_EQ(tablet->get_compaction_score(), 28); + EXPECT_EQ(tablet->calc_compaction_score(), 28); + EXPECT_EQ(tablet->get_real_compaction_score(), 28); + + std::vector<RowsetSharedPtr> input_rowsets = tablet->get_snapshot_rowset(); + for (auto it = input_rowsets.begin(); it != input_rowsets.end();) { + if ((*it)->start_version() < 10) { + it = input_rowsets.erase(it); + } else { + it++; + } + } + + RowsetSharedPtr rs = create_rowset({10, 29}, 1, false, 102400); + std::vector<RowsetSharedPtr> output_rowsets; + output_rowsets.push_back(rs); + st = tablet->modify_rowsets(output_rowsets, input_rowsets, true); + EXPECT_TRUE(st.OK()); + + EXPECT_EQ(tablet->get_compaction_score(), 9); + EXPECT_EQ(tablet->calc_compaction_score(), 9); + EXPECT_EQ(tablet->get_real_compaction_score(), 9); +} + +} // namespace doris diff --git a/be/test/olap/cumulative_compaction_policy_test.cpp b/be/test/olap/cumulative_compaction_policy_test.cpp index e4775031c28..1fc735296ce 100644 --- a/be/test/olap/cumulative_compaction_policy_test.cpp +++ b/be/test/olap/cumulative_compaction_policy_test.cpp @@ -350,8 +350,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, calc_cumulative_compaction_score std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy = CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy( CUMULATIVE_SIZE_BASED_POLICY); - const uint32_t score = _tablet->calc_compaction_score(CompactionType::CUMULATIVE_COMPACTION, - cumulative_compaction_policy); + const uint32_t score = _tablet->calc_compaction_score(); EXPECT_EQ(15, score); } @@ -372,10 +371,9 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, calc_cumulative_compaction_score std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy = CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy( CUMULATIVE_SIZE_BASED_POLICY); - const uint32_t score = _tablet->calc_compaction_score(CompactionType::CUMULATIVE_COMPACTION, - cumulative_compaction_policy); + const uint32_t score = _tablet->calc_compaction_score(); - EXPECT_EQ(7, score); + EXPECT_EQ(9, score); } TEST_F(TestSizeBasedCumulativeCompactionPolicy, calculate_cumulative_point_big_base) { diff --git a/be/test/olap/cumulative_compaction_time_series_policy_test.cpp b/be/test/olap/cumulative_compaction_time_series_policy_test.cpp index 3e88e424e43..79e540fca77 100644 --- a/be/test/olap/cumulative_compaction_time_series_policy_test.cpp +++ b/be/test/olap/cumulative_compaction_time_series_policy_test.cpp @@ -401,10 +401,10 @@ TEST_F(TestTimeSeriesCumulativeCompactionPolicy, calc_cumulative_compaction_scor std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy = CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy( CUMULATIVE_TIME_SERIES_POLICY); - const uint32_t score = _tablet->calc_compaction_score(CompactionType::CUMULATIVE_COMPACTION, - cumulative_compaction_policy); + bool ret = _tablet->suitable_for_compaction(CompactionType::CUMULATIVE_COMPACTION, + cumulative_compaction_policy); - EXPECT_EQ(9, score); + EXPECT_EQ(true, ret); } TEST_F(TestTimeSeriesCumulativeCompactionPolicy, calc_cumulative_compaction_score_big_rowset) { @@ -423,10 +423,10 @@ TEST_F(TestTimeSeriesCumulativeCompactionPolicy, calc_cumulative_compaction_scor std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy = CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy( CUMULATIVE_TIME_SERIES_POLICY); - const uint32_t score = _tablet->calc_compaction_score(CompactionType::CUMULATIVE_COMPACTION, - cumulative_compaction_policy); + bool ret = _tablet->suitable_for_compaction(CompactionType::CUMULATIVE_COMPACTION, + cumulative_compaction_policy); - EXPECT_EQ(5, score); + EXPECT_EQ(true, ret); } TEST_F(TestTimeSeriesCumulativeCompactionPolicy, pick_candidate_rowsets) { diff --git a/be/test/olap/tablet_mgr_test.cpp b/be/test/olap/tablet_mgr_test.cpp index c3cd74b55ed..1bcdcdf45c6 100644 --- a/be/test/olap/tablet_mgr_test.cpp +++ b/be/test/olap/tablet_mgr_test.cpp @@ -436,7 +436,7 @@ TEST_F(TabletMgrTest, FindTabletWithCompact) { cumulative_compaction_policies); ASSERT_EQ(compact_tablets.size(), 1); ASSERT_EQ(compact_tablets[0]->tablet_id(), 10); - ASSERT_EQ(score, 13); + ASSERT_EQ(score, 14); // create 10 tablets enable single compact // 5 tablets do cumu compaction, 5 tablets do single compaction @@ -451,7 +451,7 @@ TEST_F(TabletMgrTest, FindTabletWithCompact) { ASSERT_EQ(compact_tablets.size(), 2); ASSERT_EQ(compact_tablets[0]->tablet_id(), 19); ASSERT_EQ(compact_tablets[1]->tablet_id(), 20); - ASSERT_EQ(score, 23); + ASSERT_EQ(score, 24); create_tablet(21, false, rowset_size++); @@ -460,7 +460,7 @@ TEST_F(TabletMgrTest, FindTabletWithCompact) { cumulative_compaction_policies); ASSERT_EQ(compact_tablets.size(), 1); ASSERT_EQ(compact_tablets[0]->tablet_id(), 21); - ASSERT_EQ(score, 24); + ASSERT_EQ(score, 25); // drop all tablets for (int64_t id = 1; id <= 20; ++id) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org