This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 2c3a96ed97b [enhancement](compaction) adjust compaction concurrency based on compaction score and workload (#36672) 2c3a96ed97b is described below commit 2c3a96ed97bef5c9653424fdc61b7b40ff8119a3 Author: Luwei <814383...@qq.com> AuthorDate: Wed Jun 26 22:00:23 2024 +0800 [enhancement](compaction) adjust compaction concurrency based on compaction score and workload (#36672) 1 Resolved the issue where the priority queue did not reserve slots for cumulative compaction. 2 When considering compaction task priorities, introduced metrics for CPU and memory usage rates. When the compaction score is low, and CPU or memory usage is high, reduce the number of compaction tasks generated and allocate CPU and memory resources to queries or load. 3 Integrated the logic of the priority queue and concurrency control together, removing the previous priority code. --- be/src/common/config.cpp | 6 ++- be/src/common/config.h | 2 +- be/src/olap/olap_server.cpp | 85 +++++++++++++++++++++++------------------- be/src/olap/storage_engine.cpp | 20 ---------- be/src/util/system_metrics.cpp | 4 ++ be/src/util/system_metrics.h | 2 + 6 files changed, 57 insertions(+), 62 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 5eb0e8d26ba..4460e477c8f 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -437,6 +437,8 @@ DEFINE_Validator(compaction_task_num_per_disk, [](const int config) -> bool { return config >= 2; }); DEFINE_Validator(compaction_task_num_per_fast_disk, [](const int config) -> bool { return config >= 2; }); +DEFINE_Validator(low_priority_compaction_task_num_per_disk, + [](const int config) -> bool { return config >= 2; }); // How many rounds of cumulative compaction for each round of base compaction when compaction tasks generation. DEFINE_mInt32(cumulative_compaction_rounds_for_each_base_compaction_round, "9"); @@ -458,8 +460,8 @@ DEFINE_mInt64(pick_rowset_to_compact_interval_sec, "86400"); // Compaction priority schedule DEFINE_mBool(enable_compaction_priority_scheduling, "true"); -DEFINE_mInt32(low_priority_compaction_task_num_per_disk, "1"); -DEFINE_mDouble(low_priority_tablet_version_num_ratio, "0.7"); +DEFINE_mInt32(low_priority_compaction_task_num_per_disk, "2"); +DEFINE_mInt32(low_priority_compaction_score_threshold, "200"); // Thread count to do tablet meta checkpoint, -1 means use the data directories count. DEFINE_Int32(max_meta_checkpoint_threads, "-1"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 592e96d1dc0..dbf18002704 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -507,7 +507,7 @@ DECLARE_mInt64(pick_rowset_to_compact_interval_sec); // Compaction priority schedule DECLARE_mBool(enable_compaction_priority_scheduling); DECLARE_mInt32(low_priority_compaction_task_num_per_disk); -DECLARE_mDouble(low_priority_tablet_version_num_ratio); +DECLARE_mInt32(low_priority_compaction_score_threshold); // Thread count to do tablet meta checkpoint, -1 means use the data directories count. DECLARE_Int32(max_meta_checkpoint_threads); diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index 67d171f1a22..ec4e5843b26 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -845,6 +845,42 @@ void StorageEngine::get_tablet_rowset_versions(const PGetTabletVersionsRequest* response->mutable_status()->set_status_code(0); } +bool need_generate_compaction_tasks(int count, int thread_per_disk, CompactionType compaction_type, + bool all_base) { + if (count >= thread_per_disk) { + // Return if no available slot + return false; + } else if (count >= thread_per_disk - 1) { + // Only one slot left, check if it can be assigned to base compaction task. + if (compaction_type == CompactionType::BASE_COMPACTION) { + if (all_base) { + return false; + } + } + } + return true; +} + +int get_concurrent_per_disk(int max_score, int thread_per_disk) { + if (!config::enable_compaction_priority_scheduling) { + return thread_per_disk; + } + + double load_average = DorisMetrics::instance()->system_metrics()->get_load_average_1_min(); + int num_cores = doris::CpuInfo::num_cores(); + bool cpu_usage_high = load_average > num_cores * 0.8; + + auto process_memory_usage = doris::GlobalMemoryArbitrator::process_memory_usage(); + bool memory_usage_high = process_memory_usage > MemInfo::soft_mem_limit() * 0.8; + + if (max_score <= config::low_priority_compaction_score_threshold && + (cpu_usage_high || memory_usage_high)) { + return config::low_priority_compaction_task_num_per_disk; + } + + return thread_per_disk; +} + std::vector<TabletSharedPtr> StorageEngine::_generate_compaction_tasks( CompactionType compaction_type, std::vector<DataDir*>& data_dirs, bool check_score) { _update_cumulative_compaction_policy(); @@ -874,22 +910,11 @@ std::vector<TabletSharedPtr> StorageEngine::_generate_compaction_tasks( int count = copied_cumu_map[data_dir].size() + copied_base_map[data_dir].size(); int thread_per_disk = data_dir->is_ssd_disk() ? config::compaction_task_num_per_fast_disk : config::compaction_task_num_per_disk; - if (count >= thread_per_disk) { - // Return if no available slot - need_pick_tablet = false; - if (!check_score) { - continue; - } - } else if (count >= thread_per_disk - 1) { - // Only one slot left, check if it can be assigned to base compaction task. - if (compaction_type == CompactionType::BASE_COMPACTION) { - if (copied_cumu_map[data_dir].empty()) { - need_pick_tablet = false; - if (!check_score) { - continue; - } - } - } + + need_pick_tablet = need_generate_compaction_tasks(count, thread_per_disk, compaction_type, + copied_cumu_map[data_dir].empty()); + if (!need_pick_tablet && !check_score) { + continue; } // Even if need_pick_tablet is false, we still need to call find_best_tablet_to_compaction(), @@ -902,6 +927,9 @@ std::vector<TabletSharedPtr> StorageEngine::_generate_compaction_tasks( ? copied_cumu_map[data_dir] : copied_base_map[data_dir], &disk_max_score, _cumulative_compaction_policies); + int concurrent_num = get_concurrent_per_disk(disk_max_score, thread_per_disk); + need_pick_tablet = need_generate_compaction_tasks( + count, concurrent_num, compaction_type, copied_cumu_map[data_dir].empty()); for (const auto& tablet : tablets) { if (tablet != nullptr) { if (need_pick_tablet) { @@ -1007,17 +1035,6 @@ Status StorageEngine::_submit_compaction_task(TabletSharedPtr tablet, int64_t permits = 0; Status st = Tablet::prepare_compaction_and_calculate_permits(compaction_type, tablet, compaction, permits); - bool is_low_priority_task = [&]() { - // Can add more strategies to determine whether a task is a low priority task in the future - if (!config::enable_compaction_priority_scheduling) { - return false; - } - if (tablet->version_count() >= - (config::max_tablet_version_num * config::low_priority_tablet_version_num_ratio)) { - return false; - } - return !force; - }(); if (st.ok() && permits > 0) { if (!force) { _permit_limiter.request(permits); @@ -1027,18 +1044,8 @@ Status StorageEngine::_submit_compaction_task(TabletSharedPtr tablet, ? _cumu_compaction_thread_pool : _base_compaction_thread_pool; auto st = thread_pool->submit_func([tablet, compaction = std::move(compaction), - compaction_type, permits, force, is_low_priority_task, - this]() { - if (is_low_priority_task && !_increase_low_priority_task_nums(tablet->data_dir())) { - VLOG_DEBUG << "skip low priority compaction task for tablet: " - << tablet->tablet_id(); - // Todo: push task back - } else { - tablet->execute_compaction(*compaction); - if (is_low_priority_task) { - _decrease_low_priority_task_nums(tablet->data_dir()); - } - } + compaction_type, permits, force, this]() { + tablet->execute_compaction(*compaction); if (!force) { _permit_limiter.release(permits); } diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index cbae6515bb6..5d50bb5f4df 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -1530,26 +1530,6 @@ Status StorageEngine::_persist_broken_paths() { return Status::OK(); } -bool StorageEngine::_increase_low_priority_task_nums(DataDir* dir) { - if (!config::enable_compaction_priority_scheduling) { - return true; - } - std::lock_guard l(_low_priority_task_nums_mutex); - if (_low_priority_task_nums[dir] < config::low_priority_compaction_task_num_per_disk) { - _low_priority_task_nums[dir]++; - return true; - } - return false; -} - -void StorageEngine::_decrease_low_priority_task_nums(DataDir* dir) { - if (config::enable_compaction_priority_scheduling) { - std::lock_guard l(_low_priority_task_nums_mutex); - _low_priority_task_nums[dir]--; - DCHECK(_low_priority_task_nums[dir] >= 0); - } -} - int CreateTabletIdxCache::get_index(const std::string& key) { auto* lru_handle = lookup(key); if (lru_handle) { diff --git a/be/src/util/system_metrics.cpp b/be/src/util/system_metrics.cpp index ee7db9494c2..e2fd2102419 100644 --- a/be/src/util/system_metrics.cpp +++ b/be/src/util/system_metrics.cpp @@ -869,6 +869,10 @@ void SystemMetrics::get_disks_io_time(std::map<std::string, int64_t>* map) { } } +double SystemMetrics::get_load_average_1_min() { + return _load_average_metrics->load_average_1_minutes->value(); +} + void SystemMetrics::get_network_traffic(std::map<std::string, int64_t>* send_map, std::map<std::string, int64_t>* rcv_map) { send_map->clear(); diff --git a/be/src/util/system_metrics.h b/be/src/util/system_metrics.h index 28db964296d..c72ba369301 100644 --- a/be/src/util/system_metrics.h +++ b/be/src/util/system_metrics.h @@ -57,6 +57,8 @@ public: const std::map<std::string, int64_t>& lst_rcv_map, int64_t interval_sec, int64_t* send_rate, int64_t* rcv_rate); + double get_load_average_1_min(); + void update_max_disk_io_util_percent(const std::map<std::string, int64_t>& lst_value, int64_t interval_sec); void update_max_network_send_bytes_rate(int64_t max_send_bytes_rate); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org