This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new df26475e1a5 [Enhancement](compaction) enable the compaction producer to generate multiple compaction tasks in a single run (#45411) (#46160) df26475e1a5 is described below commit df26475e1a5bf71c4c6f8880d9dd482ff29d6dfe Author: Luwei <lu...@selectdb.com> AuthorDate: Tue Dec 31 09:51:43 2024 +0800 [Enhancement](compaction) enable the compaction producer to generate multiple compaction tasks in a single run (#45411) (#46160) pick master #45411 --- be/src/common/config.cpp | 2 + be/src/common/config.h | 2 + be/src/olap/olap_server.cpp | 14 +-- be/src/olap/tablet_manager.cpp | 52 ++++++++-- be/src/olap/tablet_manager.h | 2 +- be/test/olap/tablet_mgr_test.cpp | 217 +++++++++++++++++++++++++++++++++++++++ 6 files changed, 276 insertions(+), 13 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index c9917afecb0..783d1d65922 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1349,6 +1349,8 @@ DEFINE_mBool(enable_pipeline_task_leakage_detect, "false"); DEFINE_Bool(force_regenerate_rowsetid_on_start_error, "false"); DEFINE_mBool(enable_sleep_between_delete_cumu_compaction, "false"); +DEFINE_mInt32(compaction_num_per_round, "1"); + // clang-format off #ifdef BE_TEST // test s3 diff --git a/be/src/common/config.h b/be/src/common/config.h index 91b8f6bcb6d..303aecd2a23 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1419,6 +1419,8 @@ DECLARE_mInt32(lz4_compression_block_size); DECLARE_mBool(enable_pipeline_task_leakage_detect); DECLARE_Bool(force_regenerate_rowsetid_on_start_error); +DECLARE_mInt32(compaction_num_per_round); + // Enable sleep 5s between delete cumulative compaction. DECLARE_mBool(enable_sleep_between_delete_cumu_compaction); diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index f3fdd552164..71c8fb38681 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -919,11 +919,11 @@ std::vector<TabletSharedPtr> StorageEngine::_generate_compaction_tasks( continue; } - // Even if need_pick_tablet is false, we still need to call find_best_tablet_to_compaction(), + // Even if need_pick_tablet is false, we still need to call find_best_tablets_to_compaction(), // So that we can update the max_compaction_score metric. if (!data_dir->reach_capacity_limit(0)) { uint32_t disk_max_score = 0; - TabletSharedPtr tablet = _tablet_manager->find_best_tablet_to_compaction( + std::vector<TabletSharedPtr> tablets = _tablet_manager->find_best_tablets_to_compaction( compaction_type, data_dir, compaction_type == CompactionType::CUMULATIVE_COMPACTION ? copied_cumu_map[data_dir] @@ -932,11 +932,13 @@ std::vector<TabletSharedPtr> StorageEngine::_generate_compaction_tasks( int concurrent_num = get_concurrent_per_disk(disk_max_score, thread_per_disk); need_pick_tablet = need_generate_compaction_tasks( count, concurrent_num, compaction_type, copied_cumu_map[data_dir].empty()); - if (tablet != nullptr) { - if (need_pick_tablet) { - tablets_compaction.emplace_back(tablet); + for (const auto& tablet : tablets) { + if (tablet != nullptr) { + if (need_pick_tablet) { + tablets_compaction.emplace_back(tablet); + } + max_compaction_score = std::max(max_compaction_score, disk_max_score); } - max_compaction_score = std::max(max_compaction_score, disk_max_score); } } } diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp index 23d2b11fc45..7f9ab2aad4c 100644 --- a/be/src/olap/tablet_manager.cpp +++ b/be/src/olap/tablet_manager.cpp @@ -728,7 +728,12 @@ void TabletManager::get_tablet_stat(TTabletStatResult* result) { result->__set_tablet_stat_list(*local_cache); } -TabletSharedPtr TabletManager::find_best_tablet_to_compaction( +struct TabletScore { + TabletSharedPtr tablet_ptr; + int score; +}; + +std::vector<TabletSharedPtr> TabletManager::find_best_tablets_to_compaction( CompactionType compaction_type, DataDir* data_dir, const std::unordered_set<TabletSharedPtr>& tablet_submitted_compaction, uint32_t* score, const std::unordered_map<std::string_view, std::shared_ptr<CumulativeCompactionPolicy>>& @@ -739,6 +744,9 @@ TabletSharedPtr TabletManager::find_best_tablet_to_compaction( uint32_t highest_score = 0; uint32_t compaction_score = 0; TabletSharedPtr best_tablet; + auto cmp = [](TabletScore left, TabletScore right) { return left.score > right.score; }; + std::priority_queue<TabletScore, std::vector<TabletScore>, decltype(cmp)> top_tablets(cmp); + auto handler = [&](const TabletSharedPtr& tablet_ptr) { if (tablet_ptr->tablet_meta()->tablet_schema()->disable_auto_compaction()) { LOG_EVERY_N(INFO, 500) << "Tablet " << tablet_ptr->tablet_id() @@ -794,23 +802,55 @@ TabletSharedPtr TabletManager::find_best_tablet_to_compaction( if (current_compaction_score < 5) { tablet_ptr->set_skip_compaction(true, compaction_type, UnixSeconds()); } - if (current_compaction_score > highest_score) { - highest_score = current_compaction_score; - compaction_score = current_compaction_score; - best_tablet = tablet_ptr; + + if (config::compaction_num_per_round > 1) { + TabletScore ts; + ts.score = current_compaction_score; + ts.tablet_ptr = tablet_ptr; + if ((top_tablets.size() >= config::compaction_num_per_round && + current_compaction_score > top_tablets.top().score) || + top_tablets.size() < config::compaction_num_per_round) { + top_tablets.push(ts); + if (top_tablets.size() > config::compaction_num_per_round) { + top_tablets.pop(); + } + if (current_compaction_score > highest_score) { + highest_score = current_compaction_score; + compaction_score = current_compaction_score; + } + } + } else { + if (current_compaction_score > highest_score) { + highest_score = current_compaction_score; + compaction_score = current_compaction_score; + best_tablet = tablet_ptr; + } } }; for_each_tablet(handler, filter_all_tablets); + std::vector<TabletSharedPtr> picked_tablet; if (best_tablet != nullptr) { VLOG_CRITICAL << "Found the best tablet for compaction. " << "compaction_type=" << compaction_type_str << ", tablet_id=" << best_tablet->tablet_id() << ", path=" << data_dir->path() << ", compaction_score=" << compaction_score << ", highest_score=" << highest_score; + picked_tablet.emplace_back(std::move(best_tablet)); *score = compaction_score; } - return best_tablet; + + std::vector<TabletSharedPtr> reverse_top_tablets; + while (!top_tablets.empty()) { + reverse_top_tablets.emplace_back(top_tablets.top().tablet_ptr); + top_tablets.pop(); + } + + for (auto it = reverse_top_tablets.rbegin(); it != reverse_top_tablets.rend(); ++it) { + picked_tablet.emplace_back(*it); + } + + return picked_tablet; } Status TabletManager::load_tablet_from_meta(DataDir* data_dir, TTabletId tablet_id, diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h index 5f6b8f31bd8..f4f3765df65 100644 --- a/be/src/olap/tablet_manager.h +++ b/be/src/olap/tablet_manager.h @@ -73,7 +73,7 @@ public: // If `is_drop_table_or_partition` is true, we need to remove all remote rowsets in this tablet. Status drop_tablet(TTabletId tablet_id, TReplicaId replica_id, bool is_drop_table_or_partition); - TabletSharedPtr find_best_tablet_to_compaction( + std::vector<TabletSharedPtr> find_best_tablets_to_compaction( CompactionType compaction_type, DataDir* data_dir, const std::unordered_set<TabletSharedPtr>& tablet_submitted_compaction, uint32_t* score, const std::unordered_map<std::string_view, std::shared_ptr<CumulativeCompactionPolicy>>& diff --git a/be/test/olap/tablet_mgr_test.cpp b/be/test/olap/tablet_mgr_test.cpp index af5bf285fc8..830ca135123 100644 --- a/be/test/olap/tablet_mgr_test.cpp +++ b/be/test/olap/tablet_mgr_test.cpp @@ -32,10 +32,13 @@ #include "common/status.h" #include "gtest/gtest_pred_impl.h" #include "io/fs/local_file_system.h" +#include "olap/cumulative_compaction_policy.h" +#include "olap/cumulative_compaction_time_series_policy.h" #include "olap/data_dir.h" #include "olap/olap_common.h" #include "olap/olap_define.h" #include "olap/options.h" +#include "olap/rowset/beta_rowset.h" #include "olap/storage_engine.h" #include "olap/tablet.h" #include "olap/tablet_manager.h" @@ -84,6 +87,7 @@ public: SAFE_DELETE(k_engine); ExecEnv::GetInstance()->set_storage_engine(nullptr); _tablet_mgr = nullptr; + config::compaction_num_per_round = 1; } StorageEngine* k_engine = nullptr; @@ -341,4 +345,217 @@ TEST_F(TabletMgrTest, GetRowsetId) { } } +TEST_F(TabletMgrTest, FindTabletWithCompact) { + auto create_tablet = [this](int64_t tablet_id, bool enable_single_compact, int rowset_size) { + std::vector<TColumn> cols; + TColumn col1; + col1.column_type.type = TPrimitiveType::SMALLINT; + col1.__set_column_name("col1"); + col1.__set_is_key(true); + cols.push_back(col1); + + TColumn col2; + col2.column_type.type = TPrimitiveType::INT; + col2.__set_column_name(SEQUENCE_COL); + col2.__set_is_key(false); + col2.__set_aggregation_type(TAggregationType::REPLACE); + cols.push_back(col2); + + TColumn col3; + col3.column_type.type = TPrimitiveType::INT; + col3.__set_column_name("v1"); + col3.__set_is_key(false); + col3.__set_aggregation_type(TAggregationType::REPLACE); + cols.push_back(col3); + + RuntimeProfile profile("CreateTablet"); + TTabletSchema tablet_schema; + tablet_schema.__set_short_key_column_count(1); + tablet_schema.__set_schema_hash(3333); + tablet_schema.__set_keys_type(TKeysType::UNIQUE_KEYS); + tablet_schema.__set_storage_type(TStorageType::COLUMN); + tablet_schema.__set_columns(cols); + tablet_schema.__set_sequence_col_idx(1); + tablet_schema.__set_enable_single_replica_compaction(enable_single_compact); + TCreateTabletReq create_tablet_req; + create_tablet_req.__set_tablet_schema(tablet_schema); + create_tablet_req.__set_tablet_id(tablet_id); + create_tablet_req.__set_version(1); + create_tablet_req.__set_replica_id(tablet_id * 10); + std::vector<DataDir*> data_dirs; + data_dirs.push_back(_data_dir); + Status create_st = _tablet_mgr->create_tablet(create_tablet_req, data_dirs, &profile); + ASSERT_TRUE(create_st.ok()) << create_st; + + TabletSharedPtr tablet = _tablet_mgr->get_tablet(tablet_id); + ASSERT_TRUE(tablet); + // check dir exist + bool dir_exist = false; + Status exist_st = io::global_local_filesystem()->exists(tablet->tablet_path(), &dir_exist); + ASSERT_TRUE(exist_st.ok()) << exist_st; + ASSERT_TRUE(dir_exist); + // check meta has this tablet + TabletMetaSharedPtr new_tablet_meta(new TabletMeta()); + Status check_meta_st = + TabletMetaManager::get_meta(_data_dir, tablet_id, 3333, new_tablet_meta); + ASSERT_TRUE(check_meta_st.ok()) << check_meta_st; + // insert into rowset + auto create_rowset = [=, this](int64_t start, int64 end) { + auto rowset_meta = std::make_shared<RowsetMeta>(); + Version version(start, end); + rowset_meta->set_version(version); + rowset_meta->set_tablet_id(tablet->tablet_id()); + rowset_meta->set_tablet_uid(tablet->tablet_uid()); + rowset_meta->set_rowset_id(k_engine->next_rowset_id()); + return std::make_shared<BetaRowset>(tablet->tablet_schema(), tablet->tablet_path(), + std::move(rowset_meta)); + }; + auto st = tablet->init(); + ASSERT_TRUE(st.ok()) << st; + for (int i = 2; i <= rowset_size; ++i) { + auto rs = create_rowset(i, i); + auto st = tablet->add_inc_rowset(rs); + ASSERT_TRUE(st.ok()) << st; + } + }; + + int rowset_size = 5; + + // create 10 tablets + for (int64_t id = 1; id <= 10; ++id) { + create_tablet(id, false, rowset_size++); + } + + std::unordered_set<TabletSharedPtr> cumu_set; + std::unordered_map<std::string_view, std::shared_ptr<CumulativeCompactionPolicy>> + cumulative_compaction_policies; + cumulative_compaction_policies[CUMULATIVE_SIZE_BASED_POLICY] = + CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy( + CUMULATIVE_SIZE_BASED_POLICY); + cumulative_compaction_policies[CUMULATIVE_TIME_SERIES_POLICY] = + CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy( + CUMULATIVE_TIME_SERIES_POLICY); + uint32_t score = 0; + auto compact_tablets = _tablet_mgr->find_best_tablets_to_compaction( + CompactionType::CUMULATIVE_COMPACTION, _data_dir, cumu_set, &score, + cumulative_compaction_policies); + ASSERT_EQ(compact_tablets.size(), 1); + ASSERT_EQ(compact_tablets[0]->tablet_id(), 10); + ASSERT_EQ(score, 13); + + // create 10 tablets enable single compact + // 5 tablets do cumu compaction, 5 tablets do single compaction + // if BE_TEST is defined, tablet_id % 2 == 0 means that tablet needs to do single compact + for (int64_t id = 11; id <= 20; ++id) { + create_tablet(id, true, rowset_size++); + } + + compact_tablets = _tablet_mgr->find_best_tablets_to_compaction( + CompactionType::CUMULATIVE_COMPACTION, _data_dir, cumu_set, &score, + cumulative_compaction_policies); + ASSERT_EQ(compact_tablets.size(), 1); + ASSERT_EQ(compact_tablets[0]->tablet_id(), 20); + ASSERT_EQ(score, 23); + + create_tablet(21, false, rowset_size++); + + compact_tablets = _tablet_mgr->find_best_tablets_to_compaction( + CompactionType::CUMULATIVE_COMPACTION, _data_dir, cumu_set, &score, + cumulative_compaction_policies); + ASSERT_EQ(compact_tablets.size(), 1); + ASSERT_EQ(compact_tablets[0]->tablet_id(), 21); + ASSERT_EQ(score, 24); + + // drop all tablets + for (int64_t id = 1; id <= 21; ++id) { + Status drop_st = _tablet_mgr->drop_tablet(id, id * 10, false); + ASSERT_TRUE(drop_st.ok()) << drop_st; + } + + { + config::compaction_num_per_round = 10; + for (int64_t i = 1; i <= 100; ++i) { + create_tablet(10000 + i, false, i); + } + + compact_tablets = _tablet_mgr->find_best_tablets_to_compaction( + CompactionType::CUMULATIVE_COMPACTION, _data_dir, cumu_set, &score, + cumulative_compaction_policies); + ASSERT_EQ(compact_tablets.size(), 10); + int index = 0; + for (auto t : compact_tablets) { + ASSERT_EQ(t->tablet_id(), 10100 - index); + ASSERT_EQ(t->calc_compaction_score( + CompactionType::CUMULATIVE_COMPACTION, + cumulative_compaction_policies[CUMULATIVE_SIZE_BASED_POLICY]), + 99 - index); + index++; + } + config::compaction_num_per_round = 1; + // drop all tablets + for (int64_t id = 10001; id <= 10100; ++id) { + Status drop_st = _tablet_mgr->drop_tablet(id, id * 10, false); + ASSERT_TRUE(drop_st.ok()) << drop_st; + } + } + + { + config::compaction_num_per_round = 10; + for (int64_t i = 1; i <= 100; ++i) { + create_tablet(20000 + i, false, i); + } + + compact_tablets = _tablet_mgr->find_best_tablets_to_compaction( + CompactionType::CUMULATIVE_COMPACTION, _data_dir, cumu_set, &score, + cumulative_compaction_policies); + ASSERT_EQ(compact_tablets.size(), 10); + for (int i = 0; i < 10; ++i) { + ASSERT_EQ(compact_tablets[i]->tablet_id(), 20100 - i); + ASSERT_EQ(compact_tablets[i]->calc_compaction_score( + CompactionType::CUMULATIVE_COMPACTION, + cumulative_compaction_policies[CUMULATIVE_SIZE_BASED_POLICY]), + 99 - i); + } + + config::compaction_num_per_round = 1; + // drop all tablets + for (int64_t id = 20001; id <= 20100; ++id) { + Status drop_st = _tablet_mgr->drop_tablet(id, id * 10, false); + ASSERT_TRUE(drop_st.ok()) << drop_st; + } + + Status drop_st = _tablet_mgr->drop_tablet(20102, 20102 * 10, false); + ASSERT_TRUE(drop_st.ok()) << drop_st; + } + + { + config::compaction_num_per_round = 10; + for (int64_t i = 1; i <= 5; ++i) { + create_tablet(30000 + i, false, i + 5); + } + + compact_tablets = _tablet_mgr->find_best_tablets_to_compaction( + CompactionType::CUMULATIVE_COMPACTION, _data_dir, cumu_set, &score, + cumulative_compaction_policies); + ASSERT_EQ(compact_tablets.size(), 5); + for (int i = 0; i < 5; ++i) { + ASSERT_EQ(compact_tablets[i]->tablet_id(), 30000 + 5 - i); + ASSERT_EQ(compact_tablets[i]->calc_compaction_score( + CompactionType::CUMULATIVE_COMPACTION, + cumulative_compaction_policies[CUMULATIVE_SIZE_BASED_POLICY]), + 9 - i); + } + + config::compaction_num_per_round = 1; + // drop all tablets + for (int64_t id = 30001; id <= 30005; ++id) { + Status drop_st = _tablet_mgr->drop_tablet(id, id * 10, false); + ASSERT_TRUE(drop_st.ok()) << drop_st; + } + } + + Status trash_st = _tablet_mgr->start_trash_sweep(); + ASSERT_TRUE(trash_st.ok()) << trash_st; +} + } // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org