github-actions[bot] commented on code in PR #31215:
URL: https://github.com/apache/doris/pull/31215#discussion_r1507207577


##########
be/src/cloud/cloud_storage_engine.cpp:
##########
@@ -190,4 +240,454 @@ void CloudStorageEngine::_sync_tablets_thread_callback() {
     }
 }
 
+void CloudStorageEngine::get_cumu_compaction(
+        int64_t tablet_id, 
std::vector<std::shared_ptr<CloudCumulativeCompaction>>& res) {
+    std::lock_guard lock(_compaction_mtx);
+    if (auto it = _submitted_cumu_compactions.find(tablet_id);
+        it != _submitted_cumu_compactions.end()) {
+        res = it->second;
+    }
+}
+
+void CloudStorageEngine::_adjust_compaction_thread_num() {
+    int base_thread_num = get_base_thread_num();
+    if (_base_compaction_thread_pool->max_threads() != base_thread_num) {
+        int old_max_threads = _base_compaction_thread_pool->max_threads();
+        Status status = 
_base_compaction_thread_pool->set_max_threads(base_thread_num);
+        if (status.ok()) {
+            VLOG_NOTICE << "update base compaction thread pool max_threads 
from " << old_max_threads
+                        << " to " << base_thread_num;
+        }
+    }
+    if (_base_compaction_thread_pool->min_threads() != base_thread_num) {
+        int old_min_threads = _base_compaction_thread_pool->min_threads();
+        Status status = 
_base_compaction_thread_pool->set_min_threads(base_thread_num);
+        if (status.ok()) {
+            VLOG_NOTICE << "update base compaction thread pool min_threads 
from " << old_min_threads
+                        << " to " << base_thread_num;
+        }
+    }
+
+    int cumu_thread_num = get_cumu_thread_num();
+    if (_cumu_compaction_thread_pool->max_threads() != cumu_thread_num) {
+        int old_max_threads = _cumu_compaction_thread_pool->max_threads();
+        Status status = 
_cumu_compaction_thread_pool->set_max_threads(cumu_thread_num);
+        if (status.ok()) {
+            VLOG_NOTICE << "update cumu compaction thread pool max_threads 
from " << old_max_threads
+                        << " to " << cumu_thread_num;
+        }
+    }
+    if (_cumu_compaction_thread_pool->min_threads() != cumu_thread_num) {
+        int old_min_threads = _cumu_compaction_thread_pool->min_threads();
+        Status status = 
_cumu_compaction_thread_pool->set_min_threads(cumu_thread_num);
+        if (status.ok()) {
+            VLOG_NOTICE << "update cumu compaction thread pool min_threads 
from " << old_min_threads
+                        << " to " << cumu_thread_num;
+        }
+    }
+}
+
+void CloudStorageEngine::_compaction_tasks_producer_callback() {
+    LOG(INFO) << "try to start compaction producer process!";
+
+    int round = 0;
+    CompactionType compaction_type;
+
+    // Used to record the time when the score metric was last updated.
+    // The update of the score metric is accompanied by the logic of selecting 
the tablet.
+    // If there is no slot available, the logic of selecting the tablet will 
be terminated,
+    // which causes the score metric update to be terminated.
+    // In order to avoid this situation, we need to update the score regularly.
+    int64_t last_cumulative_score_update_time = 0;
+    int64_t last_base_score_update_time = 0;
+    static const int64_t check_score_interval_ms = 5000; // 5 secs
+
+    int64_t interval = config::generate_compaction_tasks_interval_ms;
+    do {
+        if (!config::disable_auto_compaction) {
+            _adjust_compaction_thread_num();
+
+            bool check_score = false;
+            int64_t cur_time = UnixMillis();
+            if (round < 
config::cumulative_compaction_rounds_for_each_base_compaction_round) {
+                compaction_type = CompactionType::CUMULATIVE_COMPACTION;
+                round++;
+                if (cur_time - last_cumulative_score_update_time >= 
check_score_interval_ms) {
+                    check_score = true;
+                    last_cumulative_score_update_time = cur_time;
+                }
+            } else {
+                compaction_type = CompactionType::BASE_COMPACTION;
+                round = 0;
+                if (cur_time - last_base_score_update_time >= 
check_score_interval_ms) {
+                    check_score = true;
+                    last_base_score_update_time = cur_time;
+                }
+            }
+            std::unique_ptr<ThreadPool>& thread_pool =
+                    (compaction_type == CompactionType::CUMULATIVE_COMPACTION)
+                            ? _cumu_compaction_thread_pool
+                            : _base_compaction_thread_pool;
+            VLOG_CRITICAL << "compaction thread pool. type: "
+                          << (compaction_type == 
CompactionType::CUMULATIVE_COMPACTION ? "CUMU"
+                                                                               
        : "BASE")
+                          << ", num_threads: " << thread_pool->num_threads()
+                          << ", num_threads_pending_start: "
+                          << thread_pool->num_threads_pending_start()
+                          << ", num_active_threads: " << 
thread_pool->num_active_threads()
+                          << ", max_threads: " << thread_pool->max_threads()
+                          << ", min_threads: " << thread_pool->min_threads()
+                          << ", num_total_queued_tasks: " << 
thread_pool->get_queue_size();
+            std::vector<CloudTabletSPtr> tablets_compaction =
+                    _generate_cloud_compaction_tasks(compaction_type, 
check_score);
+
+            /// Regardless of whether the tablet is submitted for compaction 
or not,
+            /// we need to call 'reset_compaction' to clean up the 
base_compaction or cumulative_compaction objects
+            /// in the tablet, because these two objects store the tablet's 
own shared_ptr.
+            /// If it is not cleaned up, the reference count of the tablet 
will always be greater than 1,
+            /// thus cannot be collected by the garbage collector. 
(TabletManager::start_trash_sweep)
+            for (const auto& tablet : tablets_compaction) {
+                Status st = submit_compaction_task(tablet, compaction_type);
+                if (st.ok()) continue;
+                if ((!st.is<ErrorCode::BE_NO_SUITABLE_VERSION>() &&
+                     !st.is<ErrorCode::CUMULATIVE_NO_SUITABLE_VERSION>()) ||
+                    VLOG_DEBUG_IS_ON) {
+                    LOG(WARNING) << "failed to submit compaction task for 
tablet: "
+                                 << tablet->tablet_id() << ", err: " << st;
+                }
+            }
+            interval = config::generate_compaction_tasks_interval_ms;
+        } else {
+            interval = config::check_auto_compaction_interval_seconds * 1000;
+        }
+    } while 
(!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(interval)));
+}
+
+std::vector<CloudTabletSPtr> 
CloudStorageEngine::_generate_cloud_compaction_tasks(

Review Comment:
   warning: function '_generate_cloud_compaction_tasks' exceeds recommended 
size/complexity thresholds [readability-function-size]
   ```cpp
   std::vector<CloudTabletSPtr> 
CloudStorageEngine::_generate_cloud_compaction_tasks(
                                                    ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/cloud/cloud_storage_engine.cpp:365:** 86 lines including whitespace 
and comments (threshold 80)
   ```cpp
   std::vector<CloudTabletSPtr> 
CloudStorageEngine::_generate_cloud_compaction_tasks(
                                                    ^
   ```
   
   </details>
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to