This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c3a96ed97b [enhancement](compaction) adjust compaction concurrency 
based on compaction score and workload (#36672)
2c3a96ed97b is described below

commit 2c3a96ed97bef5c9653424fdc61b7b40ff8119a3
Author: Luwei <814383...@qq.com>
AuthorDate: Wed Jun 26 22:00:23 2024 +0800

    [enhancement](compaction) adjust compaction concurrency based on compaction 
score and workload (#36672)
    
    1 Resolved the issue where the priority queue did not reserve slots for
    cumulative compaction.
    
    2 When considering compaction task priorities, introduced metrics for
    CPU and memory usage rates. When the compaction score is low, and CPU or
    memory usage is high, reduce the number of compaction tasks generated
    and allocate CPU and memory resources to queries or load.
    
    3 Integrated the logic of the priority queue and concurrency control
    together, removing the previous priority code.
---
 be/src/common/config.cpp       |  6 ++-
 be/src/common/config.h         |  2 +-
 be/src/olap/olap_server.cpp    | 85 +++++++++++++++++++++++-------------------
 be/src/olap/storage_engine.cpp | 20 ----------
 be/src/util/system_metrics.cpp |  4 ++
 be/src/util/system_metrics.h   |  2 +
 6 files changed, 57 insertions(+), 62 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 5eb0e8d26ba..4460e477c8f 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -437,6 +437,8 @@ DEFINE_Validator(compaction_task_num_per_disk,
                  [](const int config) -> bool { return config >= 2; });
 DEFINE_Validator(compaction_task_num_per_fast_disk,
                  [](const int config) -> bool { return config >= 2; });
+DEFINE_Validator(low_priority_compaction_task_num_per_disk,
+                 [](const int config) -> bool { return config >= 2; });
 
 // How many rounds of cumulative compaction for each round of base compaction 
when compaction tasks generation.
 DEFINE_mInt32(cumulative_compaction_rounds_for_each_base_compaction_round, 
"9");
@@ -458,8 +460,8 @@ DEFINE_mInt64(pick_rowset_to_compact_interval_sec, "86400");
 
 // Compaction priority schedule
 DEFINE_mBool(enable_compaction_priority_scheduling, "true");
-DEFINE_mInt32(low_priority_compaction_task_num_per_disk, "1");
-DEFINE_mDouble(low_priority_tablet_version_num_ratio, "0.7");
+DEFINE_mInt32(low_priority_compaction_task_num_per_disk, "2");
+DEFINE_mInt32(low_priority_compaction_score_threshold, "200");
 
 // Thread count to do tablet meta checkpoint, -1 means use the data 
directories count.
 DEFINE_Int32(max_meta_checkpoint_threads, "-1");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 592e96d1dc0..dbf18002704 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -507,7 +507,7 @@ DECLARE_mInt64(pick_rowset_to_compact_interval_sec);
 // Compaction priority schedule
 DECLARE_mBool(enable_compaction_priority_scheduling);
 DECLARE_mInt32(low_priority_compaction_task_num_per_disk);
-DECLARE_mDouble(low_priority_tablet_version_num_ratio);
+DECLARE_mInt32(low_priority_compaction_score_threshold);
 
 // Thread count to do tablet meta checkpoint, -1 means use the data 
directories count.
 DECLARE_Int32(max_meta_checkpoint_threads);
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index 67d171f1a22..ec4e5843b26 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -845,6 +845,42 @@ void StorageEngine::get_tablet_rowset_versions(const 
PGetTabletVersionsRequest*
     response->mutable_status()->set_status_code(0);
 }
 
+bool need_generate_compaction_tasks(int count, int thread_per_disk, 
CompactionType compaction_type,
+                                    bool all_base) {
+    if (count >= thread_per_disk) {
+        // Return if no available slot
+        return false;
+    } else if (count >= thread_per_disk - 1) {
+        // Only one slot left, check if it can be assigned to base compaction 
task.
+        if (compaction_type == CompactionType::BASE_COMPACTION) {
+            if (all_base) {
+                return false;
+            }
+        }
+    }
+    return true;
+}
+
+int get_concurrent_per_disk(int max_score, int thread_per_disk) {
+    if (!config::enable_compaction_priority_scheduling) {
+        return thread_per_disk;
+    }
+
+    double load_average = 
DorisMetrics::instance()->system_metrics()->get_load_average_1_min();
+    int num_cores = doris::CpuInfo::num_cores();
+    bool cpu_usage_high = load_average > num_cores * 0.8;
+
+    auto process_memory_usage = 
doris::GlobalMemoryArbitrator::process_memory_usage();
+    bool memory_usage_high = process_memory_usage > MemInfo::soft_mem_limit() 
* 0.8;
+
+    if (max_score <= config::low_priority_compaction_score_threshold &&
+        (cpu_usage_high || memory_usage_high)) {
+        return config::low_priority_compaction_task_num_per_disk;
+    }
+
+    return thread_per_disk;
+}
+
 std::vector<TabletSharedPtr> StorageEngine::_generate_compaction_tasks(
         CompactionType compaction_type, std::vector<DataDir*>& data_dirs, bool 
check_score) {
     _update_cumulative_compaction_policy();
@@ -874,22 +910,11 @@ std::vector<TabletSharedPtr> 
StorageEngine::_generate_compaction_tasks(
         int count = copied_cumu_map[data_dir].size() + 
copied_base_map[data_dir].size();
         int thread_per_disk = data_dir->is_ssd_disk() ? 
config::compaction_task_num_per_fast_disk
                                                       : 
config::compaction_task_num_per_disk;
-        if (count >= thread_per_disk) {
-            // Return if no available slot
-            need_pick_tablet = false;
-            if (!check_score) {
-                continue;
-            }
-        } else if (count >= thread_per_disk - 1) {
-            // Only one slot left, check if it can be assigned to base 
compaction task.
-            if (compaction_type == CompactionType::BASE_COMPACTION) {
-                if (copied_cumu_map[data_dir].empty()) {
-                    need_pick_tablet = false;
-                    if (!check_score) {
-                        continue;
-                    }
-                }
-            }
+
+        need_pick_tablet = need_generate_compaction_tasks(count, 
thread_per_disk, compaction_type,
+                                                          
copied_cumu_map[data_dir].empty());
+        if (!need_pick_tablet && !check_score) {
+            continue;
         }
 
         // Even if need_pick_tablet is false, we still need to call 
find_best_tablet_to_compaction(),
@@ -902,6 +927,9 @@ std::vector<TabletSharedPtr> 
StorageEngine::_generate_compaction_tasks(
                             ? copied_cumu_map[data_dir]
                             : copied_base_map[data_dir],
                     &disk_max_score, _cumulative_compaction_policies);
+            int concurrent_num = get_concurrent_per_disk(disk_max_score, 
thread_per_disk);
+            need_pick_tablet = need_generate_compaction_tasks(
+                    count, concurrent_num, compaction_type, 
copied_cumu_map[data_dir].empty());
             for (const auto& tablet : tablets) {
                 if (tablet != nullptr) {
                     if (need_pick_tablet) {
@@ -1007,17 +1035,6 @@ Status 
StorageEngine::_submit_compaction_task(TabletSharedPtr tablet,
     int64_t permits = 0;
     Status st = 
Tablet::prepare_compaction_and_calculate_permits(compaction_type, tablet,
                                                                  compaction, 
permits);
-    bool is_low_priority_task = [&]() {
-        // Can add more strategies to determine whether a task is a low 
priority task in the future
-        if (!config::enable_compaction_priority_scheduling) {
-            return false;
-        }
-        if (tablet->version_count() >=
-            (config::max_tablet_version_num * 
config::low_priority_tablet_version_num_ratio)) {
-            return false;
-        }
-        return !force;
-    }();
     if (st.ok() && permits > 0) {
         if (!force) {
             _permit_limiter.request(permits);
@@ -1027,18 +1044,8 @@ Status 
StorageEngine::_submit_compaction_task(TabletSharedPtr tablet,
                         ? _cumu_compaction_thread_pool
                         : _base_compaction_thread_pool;
         auto st = thread_pool->submit_func([tablet, compaction = 
std::move(compaction),
-                                            compaction_type, permits, force, 
is_low_priority_task,
-                                            this]() {
-            if (is_low_priority_task && 
!_increase_low_priority_task_nums(tablet->data_dir())) {
-                VLOG_DEBUG << "skip low priority compaction task for tablet: "
-                           << tablet->tablet_id();
-                // Todo: push task back
-            } else {
-                tablet->execute_compaction(*compaction);
-                if (is_low_priority_task) {
-                    _decrease_low_priority_task_nums(tablet->data_dir());
-                }
-            }
+                                            compaction_type, permits, force, 
this]() {
+            tablet->execute_compaction(*compaction);
             if (!force) {
                 _permit_limiter.release(permits);
             }
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index cbae6515bb6..5d50bb5f4df 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -1530,26 +1530,6 @@ Status StorageEngine::_persist_broken_paths() {
     return Status::OK();
 }
 
-bool StorageEngine::_increase_low_priority_task_nums(DataDir* dir) {
-    if (!config::enable_compaction_priority_scheduling) {
-        return true;
-    }
-    std::lock_guard l(_low_priority_task_nums_mutex);
-    if (_low_priority_task_nums[dir] < 
config::low_priority_compaction_task_num_per_disk) {
-        _low_priority_task_nums[dir]++;
-        return true;
-    }
-    return false;
-}
-
-void StorageEngine::_decrease_low_priority_task_nums(DataDir* dir) {
-    if (config::enable_compaction_priority_scheduling) {
-        std::lock_guard l(_low_priority_task_nums_mutex);
-        _low_priority_task_nums[dir]--;
-        DCHECK(_low_priority_task_nums[dir] >= 0);
-    }
-}
-
 int CreateTabletIdxCache::get_index(const std::string& key) {
     auto* lru_handle = lookup(key);
     if (lru_handle) {
diff --git a/be/src/util/system_metrics.cpp b/be/src/util/system_metrics.cpp
index ee7db9494c2..e2fd2102419 100644
--- a/be/src/util/system_metrics.cpp
+++ b/be/src/util/system_metrics.cpp
@@ -869,6 +869,10 @@ void 
SystemMetrics::get_disks_io_time(std::map<std::string, int64_t>* map) {
     }
 }
 
+double SystemMetrics::get_load_average_1_min() {
+    return _load_average_metrics->load_average_1_minutes->value();
+}
+
 void SystemMetrics::get_network_traffic(std::map<std::string, int64_t>* 
send_map,
                                         std::map<std::string, int64_t>* 
rcv_map) {
     send_map->clear();
diff --git a/be/src/util/system_metrics.h b/be/src/util/system_metrics.h
index 28db964296d..c72ba369301 100644
--- a/be/src/util/system_metrics.h
+++ b/be/src/util/system_metrics.h
@@ -57,6 +57,8 @@ public:
                              const std::map<std::string, int64_t>& lst_rcv_map,
                              int64_t interval_sec, int64_t* send_rate, 
int64_t* rcv_rate);
 
+    double get_load_average_1_min();
+
     void update_max_disk_io_util_percent(const std::map<std::string, int64_t>& 
lst_value,
                                          int64_t interval_sec);
     void update_max_network_send_bytes_rate(int64_t max_send_bytes_rate);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to