This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 73199122c2d [enhancement](compaction) Control the parallelism for 
urgent compacton tasks (#37782) (#38189)
73199122c2d is described below

commit 73199122c2dd43643bfe24010f3c24fb137ad004
Author: Siyang Tang <82279870+tangsiyang2...@users.noreply.github.com>
AuthorDate: Mon Jul 22 17:22:53 2024 +0800

    [enhancement](compaction) Control the parallelism for urgent compacton 
tasks (#37782) (#38189)
    
    ## Proposed changes
    
    For some urgent compaction tasks, their submittion should take
    parallelism into account.
    
    Currently, we apply the control policy for data loading in specific.
    Other source of urgent tasks are considered as eager.
---
 be/src/agent/task_worker_pool.cpp |  3 ++-
 be/src/olap/olap_server.cpp       | 41 +++++++++++++++++++++++++++++++++------
 be/src/olap/storage_engine.h      |  2 +-
 3 files changed, 38 insertions(+), 8 deletions(-)

diff --git a/be/src/agent/task_worker_pool.cpp 
b/be/src/agent/task_worker_pool.cpp
index c9d222114e0..59971f01a4d 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -40,6 +40,7 @@
 #include <sstream>
 #include <string>
 #include <thread>
+#include <type_traits>
 #include <utility>
 #include <vector>
 
@@ -1586,7 +1587,7 @@ void 
PublishVersionWorkerPool::publish_version_callback(const TAgentTaskRequest&
                         if 
(tablet->exceed_version_limit(config::max_tablet_version_num * 2 / 3) &&
                             published_count % 20 == 0) {
                             auto st = _engine.submit_compaction_task(
-                                    tablet, 
CompactionType::CUMULATIVE_COMPACTION, true);
+                                    tablet, 
CompactionType::CUMULATIVE_COMPACTION, true, false);
                             if (!st.ok()) [[unlikely]] {
                                 LOG(WARNING) << "trigger compaction failed, 
tablet_id=" << tablet_id
                                              << ", published=" << 
published_count << " : " << st;
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index 038a5f2cd45..173fc227892 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -854,12 +854,12 @@ int StorageEngine::_get_executing_compaction_num(
     return num;
 }
 
-bool need_generate_compaction_tasks(int count, int thread_per_disk, 
CompactionType compaction_type,
-                                    bool all_base) {
-    if (count >= thread_per_disk) {
+bool need_generate_compaction_tasks(int task_cnt_per_disk, int thread_per_disk,
+                                    CompactionType compaction_type, bool 
all_base) {
+    if (task_cnt_per_disk >= thread_per_disk) {
         // Return if no available slot
         return false;
-    } else if (count >= thread_per_disk - 1) {
+    } else if (task_cnt_per_disk >= thread_per_disk - 1) {
         // Only one slot left, check if it can be assigned to base compaction 
task.
         if (compaction_type == CompactionType::BASE_COMPACTION) {
             if (all_base) {
@@ -912,7 +912,7 @@ std::vector<TabletSharedPtr> 
StorageEngine::_generate_compaction_tasks(
         copied_cumu_map = _tablet_submitted_cumu_compaction;
         copied_base_map = _tablet_submitted_base_compaction;
     }
-    for (auto data_dir : data_dirs) {
+    for (auto* data_dir : data_dirs) {
         bool need_pick_tablet = true;
         // We need to reserve at least one Slot for cumulative compaction.
         // So when there is only one Slot, we have to judge whether there is a 
cumulative compaction
@@ -1091,7 +1091,36 @@ Status 
StorageEngine::_submit_compaction_task(TabletSharedPtr tablet,
 }
 
 Status StorageEngine::submit_compaction_task(TabletSharedPtr tablet, 
CompactionType compaction_type,
-                                             bool force) {
+                                             bool force, bool eager) {
+    if (!eager) {
+        DCHECK(compaction_type == CompactionType::BASE_COMPACTION ||
+               compaction_type == CompactionType::CUMULATIVE_COMPACTION);
+        std::map<DataDir*, std::unordered_set<TabletSharedPtr>> 
copied_cumu_map;
+        std::map<DataDir*, std::unordered_set<TabletSharedPtr>> 
copied_base_map;
+        {
+            std::unique_lock<std::mutex> 
lock(_tablet_submitted_compaction_mutex);
+            copied_cumu_map = _tablet_submitted_cumu_compaction;
+            copied_base_map = _tablet_submitted_base_compaction;
+        }
+        auto stores = get_stores();
+
+        auto busy_pred = [&copied_cumu_map, &copied_base_map, compaction_type,
+                          this](auto* data_dir) {
+            int count = 
_get_executing_compaction_num(copied_base_map[data_dir]) +
+                        
_get_executing_compaction_num(copied_cumu_map[data_dir]);
+            int paral = data_dir->is_ssd_disk() ? 
config::compaction_task_num_per_fast_disk
+                                                : 
config::compaction_task_num_per_disk;
+            bool all_base = copied_cumu_map[data_dir].empty();
+            return need_generate_compaction_tasks(count, paral, 
compaction_type, all_base);
+        };
+
+        bool is_busy = std::none_of(stores.begin(), stores.end(), busy_pred);
+        if (is_busy) {
+            LOG_EVERY_N(WARNING, 100)
+                    << "Too busy to submit a compaction task, tablet=" << 
tablet->get_table_id();
+            return Status::OK();
+        }
+    }
     _update_cumulative_compaction_policy();
     // alter table tableName set ("compaction_policy"="time_series")
     // if atler table's compaction  policy, we need to modify tablet 
compaction policy shared ptr
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 99e92828a0b..f647869e825 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -195,7 +195,7 @@ public:
     void check_cumulative_compaction_config();
 
     Status submit_compaction_task(TabletSharedPtr tablet, CompactionType 
compaction_type,
-                                  bool force);
+                                  bool force, bool eager = true);
     Status submit_seg_compaction_task(std::shared_ptr<SegcompactionWorker> 
worker,
                                       SegCompactionCandidatesSharedPtr 
segments);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to