This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 9958eb91a3f branch-2.1: [fix](compaction) fix compaction producer hold 
for permits leak #45664 (#46753)
9958eb91a3f is described below

commit 9958eb91a3f18c5ec0d04269291d8209ecaff17e
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Jan 10 15:53:01 2025 +0800

    branch-2.1: [fix](compaction) fix compaction producer hold for permits leak 
#45664 (#46753)
    
    Cherry-picked from #45664
    
    Co-authored-by: shee <13843187+qz...@users.noreply.github.com>
    Co-authored-by: garenshi <garen...@tencent.com>
    Co-authored-by: camby <camby...@tencent.com>
---
 be/src/olap/olap_server.cpp | 36 ++++++++++++++++--------------------
 1 file changed, 16 insertions(+), 20 deletions(-)

diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index 71c8fb38681..014213c4694 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -633,20 +633,6 @@ void StorageEngine::_compaction_tasks_producer_callback() {
                     last_base_score_update_time = cur_time;
                 }
             }
-            std::unique_ptr<ThreadPool>& thread_pool =
-                    (compaction_type == CompactionType::CUMULATIVE_COMPACTION)
-                            ? _cumu_compaction_thread_pool
-                            : _base_compaction_thread_pool;
-            VLOG_CRITICAL << "compaction thread pool. type: "
-                          << (compaction_type == 
CompactionType::CUMULATIVE_COMPACTION ? "CUMU"
-                                                                               
        : "BASE")
-                          << ", num_threads: " << thread_pool->num_threads()
-                          << ", num_threads_pending_start: "
-                          << thread_pool->num_threads_pending_start()
-                          << ", num_active_threads: " << 
thread_pool->num_active_threads()
-                          << ", max_threads: " << thread_pool->max_threads()
-                          << ", min_threads: " << thread_pool->min_threads()
-                          << ", num_total_queued_tasks: " << 
thread_pool->get_queue_size();
             std::vector<TabletSharedPtr> tablets_compaction =
                     _generate_compaction_tasks(compaction_type, data_dirs, 
check_score);
             if (tablets_compaction.size() == 0) {
@@ -1043,23 +1029,33 @@ Status 
StorageEngine::_submit_compaction_task(TabletSharedPtr tablet,
                 (compaction_type == CompactionType::CUMULATIVE_COMPACTION)
                         ? _cumu_compaction_thread_pool
                         : _base_compaction_thread_pool;
+        VLOG_CRITICAL << "compaction thread pool. type: "
+                      << (compaction_type == 
CompactionType::CUMULATIVE_COMPACTION ? "CUMU"
+                                                                               
    : "BASE")
+                      << ", num_threads: " << thread_pool->num_threads()
+                      << ", num_threads_pending_start: " << 
thread_pool->num_threads_pending_start()
+                      << ", num_active_threads: " << 
thread_pool->num_active_threads()
+                      << ", max_threads: " << thread_pool->max_threads()
+                      << ", min_threads: " << thread_pool->min_threads()
+                      << ", num_total_queued_tasks: " << 
thread_pool->get_queue_size();
         auto st = thread_pool->submit_func([tablet, compaction = 
std::move(compaction),
                                             compaction_type, permits, force, 
this]() {
+            Defer defer {[&]() {
+                if (!force) {
+                    _permit_limiter.release(permits);
+                }
+                _pop_tablet_from_submitted_compaction(tablet, compaction_type);
+                tablet->compaction_stage = CompactionStage::NOT_SCHEDULED;
+            }};
             if (!tablet->can_do_compaction(tablet->data_dir()->path_hash(), 
compaction_type)) {
                 LOG(INFO) << "Tablet state has been changed, no need to begin 
this compaction "
                              "task, tablet_id="
                           << tablet->tablet_id() << ", tablet_state=" << 
tablet->tablet_state();
-                _pop_tablet_from_submitted_compaction(tablet, compaction_type);
                 return;
             }
             tablet->compaction_stage = CompactionStage::EXECUTING;
             
TEST_SYNC_POINT_RETURN_WITH_VOID("olap_server::execute_compaction");
             tablet->execute_compaction(*compaction);
-            if (!force) {
-                _permit_limiter.release(permits);
-            }
-            _pop_tablet_from_submitted_compaction(tablet, compaction_type);
-            tablet->compaction_stage = CompactionStage::NOT_SCHEDULED;
         });
         if (!st.ok()) {
             if (!force) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to