This is an automated email from the ASF dual-hosted git repository.

wangbo pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new f5572ac732e [pick]reset memtable flush thread num (#37092)
f5572ac732e is described below

commit f5572ac732e1d40a2f2301cd270b3be854377c3f
Author: wangbo <wan...@apache.org>
AuthorDate: Tue Jul 2 19:20:17 2024 +0800

    [pick]reset memtable flush thread num (#37092)
    
    ## Proposed changes
    
    pick #37028
---
 be/src/common/config.cpp                         |  2 +
 be/src/common/config.h                           |  3 ++
 be/src/olap/delta_writer_v2.cpp                  |  2 +-
 be/src/olap/storage_engine.cpp                   |  1 +
 be/src/olap/storage_engine.h                     |  4 ++
 be/src/runtime/fragment_mgr.cpp                  |  6 +--
 be/src/runtime/query_context.cpp                 |  6 +--
 be/src/runtime/query_context.h                   |  4 +-
 be/src/runtime/workload_group/workload_group.cpp | 47 +++++++++++++++---------
 be/src/runtime/workload_group/workload_group.h   |  2 +-
 be/src/vec/sink/writer/async_result_writer.cpp   | 27 ++++----------
 11 files changed, 55 insertions(+), 49 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 910bf69609e..563e4750165 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -671,6 +671,8 @@ DEFINE_Int32(flush_thread_num_per_store, "6");
 // number of thread for flushing memtable per store, for high priority load 
task
 DEFINE_Int32(high_priority_flush_thread_num_per_store, "6");
 
+DEFINE_Int32(wg_flush_thread_num_per_store, "6");
+
 // config for tablet meta checkpoint
 DEFINE_mInt32(tablet_meta_checkpoint_min_new_rowsets_num, "10");
 DEFINE_mInt32(tablet_meta_checkpoint_min_interval_secs, "600");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 2d0dc128a2a..21325a0f011 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -726,6 +726,9 @@ DECLARE_Int32(flush_thread_num_per_store);
 // number of thread for flushing memtable per store, for high priority load 
task
 DECLARE_Int32(high_priority_flush_thread_num_per_store);
 
+// workload group's flush thread num
+DECLARE_Int32(wg_flush_thread_num_per_store);
+
 // config for tablet meta checkpoint
 DECLARE_mInt32(tablet_meta_checkpoint_min_new_rowsets_num);
 DECLARE_mInt32(tablet_meta_checkpoint_min_interval_secs);
diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp
index 5cfc260d1b5..378728f025c 100644
--- a/be/src/olap/delta_writer_v2.cpp
+++ b/be/src/olap/delta_writer_v2.cpp
@@ -128,7 +128,7 @@ Status DeltaWriterV2::init() {
     RETURN_IF_ERROR(_rowset_writer->init(context));
     ThreadPool* wg_thread_pool_ptr = nullptr;
     if (_state->get_query_ctx()) {
-        wg_thread_pool_ptr = 
_state->get_query_ctx()->get_non_pipe_exec_thread_pool();
+        wg_thread_pool_ptr = 
_state->get_query_ctx()->get_memtable_flush_pool();
     }
     RETURN_IF_ERROR(_memtable_writer->init(_rowset_writer, _tablet_schema, 
_partial_update_info,
                                            wg_thread_pool_ptr,
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index b838af570a2..91c297b1960 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -181,6 +181,7 @@ Status StorageEngine::_open() {
     RETURN_NOT_OK_STATUS_WITH_WARN(_check_file_descriptor_number(), "check fd 
number failed");
 
     auto dirs = get_stores<false>();
+    _disk_num = dirs.size();
     RETURN_IF_ERROR(load_data_dirs(dirs));
 
     _memtable_flush_executor.reset(new MemTableFlushExecutor());
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index f2b5f421670..9dc18dfb276 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -224,6 +224,8 @@ public:
 
     std::set<string> get_broken_paths() { return _broken_paths; }
 
+    int get_disk_num() { return _disk_num; }
+
 private:
     // Instance should be inited from `static open()`
     // MUST NOT be called in other circumstances.
@@ -469,6 +471,8 @@ private:
 
     std::unique_ptr<CreateTabletIdxCache> _create_tablet_idx_lru_cache;
 
+    int _disk_num {-1};
+
     DISALLOW_COPY_AND_ASSIGN(StorageEngine);
 };
 
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 08de61f8931..bd5308aeba1 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -775,11 +775,7 @@ Status FragmentMgr::exec_plan_fragment(const 
TExecPlanFragmentParams& params,
                 std::make_pair(params.params.fragment_instance_id, 
fragment_executor));
     }
 
-    auto* current_thread_pool = query_ctx->get_non_pipe_exec_thread_pool();
-    if (!current_thread_pool) {
-        current_thread_pool = _thread_pool.get();
-    }
-    auto st = current_thread_pool->submit_func([this, fragment_executor, cb]() 
{
+    auto st = _thread_pool->submit_func([this, fragment_executor, cb]() {
 #ifndef BE_TEST
         SCOPED_ATTACH_TASK(fragment_executor->runtime_state());
 #endif
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index bbcdc3b4771..40518e62cc8 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -307,9 +307,9 @@ doris::pipeline::TaskScheduler* 
QueryContext::get_pipe_exec_scheduler() {
     return _exec_env->pipeline_task_scheduler();
 }
 
-ThreadPool* QueryContext::get_non_pipe_exec_thread_pool() {
+ThreadPool* QueryContext::get_memtable_flush_pool() {
     if (_workload_group) {
-        return _non_pipe_thread_pool;
+        return _memtable_flush_pool;
     } else {
         return nullptr;
     }
@@ -321,7 +321,7 @@ Status QueryContext::set_workload_group(WorkloadGroupPtr& 
tg) {
     // see task_group_manager::delete_workload_group_by_ids
     _workload_group->add_mem_tracker_limiter(query_mem_tracker);
     _workload_group->get_query_scheduler(&_task_scheduler, 
&_scan_task_scheduler,
-                                         &_non_pipe_thread_pool, 
&_remote_scan_task_scheduler);
+                                         &_memtable_flush_pool, 
&_remote_scan_task_scheduler);
     return Status::OK();
 }
 
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 2653eeddc8b..82e75b72cee 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -236,7 +236,7 @@ public:
 
     doris::pipeline::TaskScheduler* get_pipe_exec_scheduler();
 
-    ThreadPool* get_non_pipe_exec_thread_pool();
+    ThreadPool* get_memtable_flush_pool();
 
     int64_t mem_limit() const { return _bytes_limit; }
 
@@ -337,7 +337,7 @@ private:
 
     doris::pipeline::TaskScheduler* _task_scheduler = nullptr;
     vectorized::SimplifiedScanScheduler* _scan_task_scheduler = nullptr;
-    ThreadPool* _non_pipe_thread_pool = nullptr;
+    ThreadPool* _memtable_flush_pool = nullptr;
     vectorized::SimplifiedScanScheduler* _remote_scan_task_scheduler = nullptr;
     std::unique_ptr<pipeline::Dependency> _execution_dependency;
 
diff --git a/be/src/runtime/workload_group/workload_group.cpp 
b/be/src/runtime/workload_group/workload_group.cpp
index fd885aaacb1..372eecafd07 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -27,6 +27,7 @@
 #include <utility>
 
 #include "common/logging.h"
+#include "olap/storage_engine.h"
 #include "pipeline/task_queue.h"
 #include "pipeline/task_scheduler.h"
 #include "runtime/exec_env.h"
@@ -429,19 +430,29 @@ void 
WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e
                                                   min_remote_scan_thread_num);
     }
 
-    if (_non_pipe_thread_pool == nullptr) {
-        std::unique_ptr<ThreadPool> thread_pool = nullptr;
-        auto ret = ThreadPoolBuilder("nonPip_" + tg_name)
-                           .set_min_threads(1)
-                           
.set_max_threads(config::fragment_pool_thread_num_max)
-                           
.set_max_queue_size(config::fragment_pool_queue_size)
-                           .set_cgroup_cpu_ctl(cg_cpu_ctl_ptr)
-                           .build(&thread_pool);
-        if (!ret.ok()) {
-            LOG(INFO) << "[upsert wg thread pool] create non-pipline thread 
pool failed, gid="
-                      << tg_id;
-        } else {
-            _non_pipe_thread_pool = std::move(thread_pool);
+    if (_memtable_flush_pool == nullptr) {
+        int num_disk = 
ExecEnv::GetInstance()->get_storage_engine()->get_disk_num();
+        // -1 means disk num may not be inited, so not create flush pool
+        if (num_disk != -1) {
+            std::unique_ptr<ThreadPool> thread_pool = nullptr;
+
+            size_t min_threads = std::max(1, 
config::wg_flush_thread_num_per_store);
+            size_t max_threads = num_disk * min_threads;
+            std::string pool_name = "wg_flush_" + tg_name;
+            auto ret = ThreadPoolBuilder(pool_name)
+                               .set_min_threads(min_threads)
+                               .set_max_threads(max_threads)
+                               .set_cgroup_cpu_ctl(cg_cpu_ctl_ptr)
+                               .build(&thread_pool);
+            if (!ret.ok()) {
+                LOG(INFO) << "[upsert wg thread pool] create " + pool_name + " 
failed, gid="
+                          << tg_id;
+            } else {
+                _memtable_flush_pool = std::move(thread_pool);
+                LOG(INFO) << "[upsert wg thread pool] create " + pool_name + " 
succ, gid=" << tg_id
+                          << ", max thread num=" << max_threads
+                          << ", min thread num=" << min_threads;
+            }
         }
     }
 
@@ -469,13 +480,13 @@ void 
WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e
 
 void WorkloadGroup::get_query_scheduler(doris::pipeline::TaskScheduler** 
exec_sched,
                                         vectorized::SimplifiedScanScheduler** 
scan_sched,
-                                        ThreadPool** non_pipe_thread_pool,
+                                        ThreadPool** memtable_flush_pool,
                                         vectorized::SimplifiedScanScheduler** 
remote_scan_sched) {
     std::shared_lock<std::shared_mutex> rlock(_task_sched_lock);
     *exec_sched = _task_sched.get();
     *scan_sched = _scan_task_sched.get();
     *remote_scan_sched = _remote_scan_task_sched.get();
-    *non_pipe_thread_pool = _non_pipe_thread_pool.get();
+    *memtable_flush_pool = _memtable_flush_pool.get();
 }
 
 void WorkloadGroup::try_stop_schedulers() {
@@ -489,9 +500,9 @@ void WorkloadGroup::try_stop_schedulers() {
     if (_remote_scan_task_sched) {
         _remote_scan_task_sched->stop();
     }
-    if (_non_pipe_thread_pool) {
-        _non_pipe_thread_pool->shutdown();
-        _non_pipe_thread_pool->wait();
+    if (_memtable_flush_pool) {
+        _memtable_flush_pool->shutdown();
+        _memtable_flush_pool->wait();
     }
 }
 
diff --git a/be/src/runtime/workload_group/workload_group.h 
b/be/src/runtime/workload_group/workload_group.h
index cee35c66af8..e41c1793233 100644
--- a/be/src/runtime/workload_group/workload_group.h
+++ b/be/src/runtime/workload_group/workload_group.h
@@ -190,7 +190,7 @@ private:
     std::unique_ptr<doris::pipeline::TaskScheduler> _task_sched {nullptr};
     std::unique_ptr<vectorized::SimplifiedScanScheduler> _scan_task_sched 
{nullptr};
     std::unique_ptr<vectorized::SimplifiedScanScheduler> 
_remote_scan_task_sched {nullptr};
-    std::unique_ptr<ThreadPool> _non_pipe_thread_pool = nullptr;
+    std::unique_ptr<ThreadPool> _memtable_flush_pool = nullptr;
 };
 
 using WorkloadGroupPtr = std::shared_ptr<WorkloadGroup>;
diff --git a/be/src/vec/sink/writer/async_result_writer.cpp 
b/be/src/vec/sink/writer/async_result_writer.cpp
index 4ed878a4634..2982bf8174a 100644
--- a/be/src/vec/sink/writer/async_result_writer.cpp
+++ b/be/src/vec/sink/writer/async_result_writer.cpp
@@ -96,25 +96,14 @@ Status AsyncResultWriter::start_writer(RuntimeState* state, 
RuntimeProfile* prof
     // This is a async thread, should lock the task ctx, to make sure 
runtimestate and profile
     // not deconstructed before the thread exit.
     auto task_ctx = state->get_task_execution_context();
-    if (state->get_query_ctx() && 
state->get_query_ctx()->get_non_pipe_exec_thread_pool()) {
-        ThreadPool* pool_ptr = 
state->get_query_ctx()->get_non_pipe_exec_thread_pool();
-        RETURN_IF_ERROR(pool_ptr->submit_func([this, state, profile, 
task_ctx]() {
-            auto task_lock = task_ctx.lock();
-            if (task_lock == nullptr) {
-                return;
-            }
-            this->process_block(state, profile);
-        }));
-    } else {
-        
RETURN_IF_ERROR(ExecEnv::GetInstance()->fragment_mgr()->get_thread_pool()->submit_func(
-                [this, state, profile, task_ctx]() {
-                    auto task_lock = task_ctx.lock();
-                    if (task_lock == nullptr) {
-                        return;
-                    }
-                    this->process_block(state, profile);
-                }));
-    }
+    
RETURN_IF_ERROR(ExecEnv::GetInstance()->fragment_mgr()->get_thread_pool()->submit_func(
+            [this, state, profile, task_ctx]() {
+                auto task_lock = task_ctx.lock();
+                if (task_lock == nullptr) {
+                    return;
+                }
+                this->process_block(state, profile);
+            }));
     return Status::OK();
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to