This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 9958eb91a3f branch-2.1: [fix](compaction) fix compaction producer hold for permits leak #45664 (#46753) 9958eb91a3f is described below commit 9958eb91a3f18c5ec0d04269291d8209ecaff17e Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Fri Jan 10 15:53:01 2025 +0800 branch-2.1: [fix](compaction) fix compaction producer hold for permits leak #45664 (#46753) Cherry-picked from #45664 Co-authored-by: shee <13843187+qz...@users.noreply.github.com> Co-authored-by: garenshi <garen...@tencent.com> Co-authored-by: camby <camby...@tencent.com> --- be/src/olap/olap_server.cpp | 36 ++++++++++++++++-------------------- 1 file changed, 16 insertions(+), 20 deletions(-) diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index 71c8fb38681..014213c4694 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -633,20 +633,6 @@ void StorageEngine::_compaction_tasks_producer_callback() { last_base_score_update_time = cur_time; } } - std::unique_ptr<ThreadPool>& thread_pool = - (compaction_type == CompactionType::CUMULATIVE_COMPACTION) - ? _cumu_compaction_thread_pool - : _base_compaction_thread_pool; - VLOG_CRITICAL << "compaction thread pool. type: " - << (compaction_type == CompactionType::CUMULATIVE_COMPACTION ? "CUMU" - : "BASE") - << ", num_threads: " << thread_pool->num_threads() - << ", num_threads_pending_start: " - << thread_pool->num_threads_pending_start() - << ", num_active_threads: " << thread_pool->num_active_threads() - << ", max_threads: " << thread_pool->max_threads() - << ", min_threads: " << thread_pool->min_threads() - << ", num_total_queued_tasks: " << thread_pool->get_queue_size(); std::vector<TabletSharedPtr> tablets_compaction = _generate_compaction_tasks(compaction_type, data_dirs, check_score); if (tablets_compaction.size() == 0) { @@ -1043,23 +1029,33 @@ Status StorageEngine::_submit_compaction_task(TabletSharedPtr tablet, (compaction_type == CompactionType::CUMULATIVE_COMPACTION) ? _cumu_compaction_thread_pool : _base_compaction_thread_pool; + VLOG_CRITICAL << "compaction thread pool. type: " + << (compaction_type == CompactionType::CUMULATIVE_COMPACTION ? "CUMU" + : "BASE") + << ", num_threads: " << thread_pool->num_threads() + << ", num_threads_pending_start: " << thread_pool->num_threads_pending_start() + << ", num_active_threads: " << thread_pool->num_active_threads() + << ", max_threads: " << thread_pool->max_threads() + << ", min_threads: " << thread_pool->min_threads() + << ", num_total_queued_tasks: " << thread_pool->get_queue_size(); auto st = thread_pool->submit_func([tablet, compaction = std::move(compaction), compaction_type, permits, force, this]() { + Defer defer {[&]() { + if (!force) { + _permit_limiter.release(permits); + } + _pop_tablet_from_submitted_compaction(tablet, compaction_type); + tablet->compaction_stage = CompactionStage::NOT_SCHEDULED; + }}; if (!tablet->can_do_compaction(tablet->data_dir()->path_hash(), compaction_type)) { LOG(INFO) << "Tablet state has been changed, no need to begin this compaction " "task, tablet_id=" << tablet->tablet_id() << ", tablet_state=" << tablet->tablet_state(); - _pop_tablet_from_submitted_compaction(tablet, compaction_type); return; } tablet->compaction_stage = CompactionStage::EXECUTING; TEST_SYNC_POINT_RETURN_WITH_VOID("olap_server::execute_compaction"); tablet->execute_compaction(*compaction); - if (!force) { - _permit_limiter.release(permits); - } - _pop_tablet_from_submitted_compaction(tablet, compaction_type); - tablet->compaction_stage = CompactionStage::NOT_SCHEDULED; }); if (!st.ok()) { if (!force) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org