This is an automated email from the ASF dual-hosted git repository.

wangbo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 63c5625b783 [Improvement]Add more load cpu usage to workload group 
(#42053)
63c5625b783 is described below

commit 63c5625b783d3312fce59e997115b9de8367c808
Author: wangbo <wan...@apache.org>
AuthorDate: Wed Oct 23 10:54:29 2024 +0800

    [Improvement]Add more load cpu usage to workload group (#42053)
    
    ## Proposed changes
    Add more workload cpu usage to workload group.
    1  AsyncResultWriter's cpu usage.
    2 Memtable flush's cpu usage when memtable is not on sink side.
---
 be/src/olap/delta_writer.cpp                   |  7 ++++++-
 be/src/olap/delta_writer_v2.cpp                |  7 +++----
 be/src/olap/memtable_flush_executor.cpp        | 28 ++++++++++++--------------
 be/src/olap/memtable_flush_executor.h          | 13 ++++++------
 be/src/olap/memtable_writer.cpp                | 18 ++++-------------
 be/src/olap/memtable_writer.h                  |  3 ++-
 be/src/runtime/query_context.h                 |  2 +-
 be/src/runtime/workload_group/workload_group.h | 11 ++++++++++
 be/src/vec/sink/writer/async_result_writer.cpp | 11 ++++++++++
 9 files changed, 58 insertions(+), 42 deletions(-)

diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 00c622df59f..e0e3a5281bc 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -103,10 +103,15 @@ Status BaseDeltaWriter::init() {
     if (_is_init) {
         return Status::OK();
     }
+    auto* t_ctx = doris::thread_context(true);
+    std::shared_ptr<WorkloadGroup> wg_sptr = nullptr;
+    if (t_ctx) {
+        wg_sptr = t_ctx->workload_group().lock();
+    }
     RETURN_IF_ERROR(_rowset_builder->init());
     RETURN_IF_ERROR(_memtable_writer->init(
             _rowset_builder->rowset_writer(), _rowset_builder->tablet_schema(),
-            _rowset_builder->get_partial_update_info(), nullptr,
+            _rowset_builder->get_partial_update_info(), wg_sptr,
             _rowset_builder->tablet()->enable_unique_key_merge_on_write()));
     
ExecEnv::GetInstance()->memtable_memory_limiter()->register_writer(_memtable_writer);
     _is_init = true;
diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp
index 0e83de2ca17..a6fb0154489 100644
--- a/be/src/olap/delta_writer_v2.cpp
+++ b/be/src/olap/delta_writer_v2.cpp
@@ -127,13 +127,12 @@ Status DeltaWriterV2::init() {
 
     _rowset_writer = std::make_shared<BetaRowsetWriterV2>(_streams);
     RETURN_IF_ERROR(_rowset_writer->init(context));
-    ThreadPool* wg_thread_pool_ptr = nullptr;
+    std::shared_ptr<WorkloadGroup> wg_sptr = nullptr;
     if (_state->get_query_ctx()) {
-        wg_thread_pool_ptr = 
_state->get_query_ctx()->get_memtable_flush_pool();
+        wg_sptr = _state->get_query_ctx()->workload_group();
     }
     RETURN_IF_ERROR(_memtable_writer->init(_rowset_writer, _tablet_schema, 
_partial_update_info,
-                                           wg_thread_pool_ptr,
-                                           
_streams[0]->enable_unique_mow(_req.index_id)));
+                                           wg_sptr, 
_streams[0]->enable_unique_mow(_req.index_id)));
     
ExecEnv::GetInstance()->memtable_memory_limiter()->register_writer(_memtable_writer);
     _is_init = true;
     _streams.clear();
diff --git a/be/src/olap/memtable_flush_executor.cpp 
b/be/src/olap/memtable_flush_executor.cpp
index dc911647be8..5cdb45281b9 100644
--- a/be/src/olap/memtable_flush_executor.cpp
+++ b/be/src/olap/memtable_flush_executor.cpp
@@ -100,7 +100,16 @@ Status FlushToken::submit(std::shared_ptr<MemTable> 
mem_table) {
     int64_t submit_task_time = MonotonicNanos();
     auto task = MemtableFlushTask::create_shared(
             shared_from_this(), mem_table, 
_rowset_writer->allocate_segment_id(), submit_task_time);
-    Status ret = _thread_pool->submit(std::move(task));
+    // NOTE: we should guarantee WorkloadGroup is not deconstructed when 
submit memtable flush task.
+    // because currently WorkloadGroup's can only be destroyed when all 
queries in the group is finished,
+    // but not consider whether load channel is finish.
+    std::shared_ptr<WorkloadGroup> wg_sptr = _wg_wptr.lock();
+    ThreadPool* wg_thread_pool = nullptr;
+    if (wg_sptr) {
+        wg_thread_pool = wg_sptr->get_memtable_flush_pool_ptr();
+    }
+    Status ret = wg_thread_pool ? wg_thread_pool->submit(std::move(task))
+                                : _thread_pool->submit(std::move(task));
     if (ret.ok()) {
         // _wait_running_task_finish was executed after this function, so no 
need to notify _cond here
         _stats.flush_running_count++;
@@ -236,7 +245,8 @@ void MemTableFlushExecutor::init(int num_disk) {
 // NOTE: we use SERIAL mode here to ensure all mem-tables from one tablet are 
flushed in order.
 Status MemTableFlushExecutor::create_flush_token(std::shared_ptr<FlushToken>& 
flush_token,
                                                  std::shared_ptr<RowsetWriter> 
rowset_writer,
-                                                 bool is_high_priority) {
+                                                 bool is_high_priority,
+                                                 
std::shared_ptr<WorkloadGroup> wg_sptr) {
     switch (rowset_writer->type()) {
     case ALPHA_ROWSET:
         // alpha rowset do not support flush in CONCURRENT.  and not support 
alpha rowset now.
@@ -244,7 +254,7 @@ Status 
MemTableFlushExecutor::create_flush_token(std::shared_ptr<FlushToken>& fl
     case BETA_ROWSET: {
         // beta rowset can be flush in CONCURRENT, because each memtable using 
a new segment writer.
         ThreadPool* pool = is_high_priority ? _high_prio_flush_pool.get() : 
_flush_pool.get();
-        flush_token = FlushToken::create_shared(pool);
+        flush_token = FlushToken::create_shared(pool, wg_sptr);
         flush_token->set_rowset_writer(rowset_writer);
         return Status::OK();
     }
@@ -253,18 +263,6 @@ Status 
MemTableFlushExecutor::create_flush_token(std::shared_ptr<FlushToken>& fl
     }
 }
 
-Status MemTableFlushExecutor::create_flush_token(std::shared_ptr<FlushToken>& 
flush_token,
-                                                 std::shared_ptr<RowsetWriter> 
rowset_writer,
-                                                 ThreadPool* 
wg_flush_pool_ptr) {
-    if (rowset_writer->type() == BETA_ROWSET) {
-        flush_token = FlushToken::create_shared(wg_flush_pool_ptr);
-    } else {
-        return Status::InternalError<false>("not support alpha rowset load 
now.");
-    }
-    flush_token->set_rowset_writer(rowset_writer);
-    return Status::OK();
-}
-
 void MemTableFlushExecutor::_register_metrics() {
     REGISTER_HOOK_METRIC(flush_thread_pool_queue_size,
                          [this]() { return _flush_pool->get_queue_size(); });
diff --git a/be/src/olap/memtable_flush_executor.h 
b/be/src/olap/memtable_flush_executor.h
index 25c5a37afba..27e8e8a9b0e 100644
--- a/be/src/olap/memtable_flush_executor.h
+++ b/be/src/olap/memtable_flush_executor.h
@@ -34,6 +34,7 @@ namespace doris {
 class DataDir;
 class MemTable;
 class RowsetWriter;
+class WorkloadGroup;
 
 // the statistic of a certain flush handler.
 // use atomic because it may be updated by multi threads
@@ -59,7 +60,8 @@ class FlushToken : public 
std::enable_shared_from_this<FlushToken> {
     ENABLE_FACTORY_CREATOR(FlushToken);
 
 public:
-    FlushToken(ThreadPool* thread_pool) : _flush_status(Status::OK()), 
_thread_pool(thread_pool) {}
+    FlushToken(ThreadPool* thread_pool, std::shared_ptr<WorkloadGroup> wg_sptr)
+            : _flush_status(Status::OK()), _thread_pool(thread_pool), 
_wg_wptr(wg_sptr) {}
 
     Status submit(std::shared_ptr<MemTable> mem_table);
 
@@ -108,6 +110,8 @@ private:
 
     std::mutex _mutex;
     std::condition_variable _cond;
+
+    std::weak_ptr<WorkloadGroup> _wg_wptr;
 };
 
 // MemTableFlushExecutor is responsible for flushing memtables to disk.
@@ -133,11 +137,8 @@ public:
     void init(int num_disk);
 
     Status create_flush_token(std::shared_ptr<FlushToken>& flush_token,
-                              std::shared_ptr<RowsetWriter> rowset_writer, 
bool is_high_priority);
-
-    Status create_flush_token(std::shared_ptr<FlushToken>& flush_token,
-                              std::shared_ptr<RowsetWriter> rowset_writer,
-                              ThreadPool* wg_flush_pool_ptr);
+                              std::shared_ptr<RowsetWriter> rowset_writer, 
bool is_high_priority,
+                              std::shared_ptr<WorkloadGroup> wg_sptr);
 
 private:
     void _register_metrics();
diff --git a/be/src/olap/memtable_writer.cpp b/be/src/olap/memtable_writer.cpp
index e8123c48ecc..88532646b66 100644
--- a/be/src/olap/memtable_writer.cpp
+++ b/be/src/olap/memtable_writer.cpp
@@ -65,7 +65,7 @@ MemTableWriter::~MemTableWriter() {
 Status MemTableWriter::init(std::shared_ptr<RowsetWriter> rowset_writer,
                             TabletSchemaSPtr tablet_schema,
                             std::shared_ptr<PartialUpdateInfo> 
partial_update_info,
-                            ThreadPool* wg_flush_pool_ptr, bool 
unique_key_mow) {
+                            std::shared_ptr<WorkloadGroup> wg_sptr, bool 
unique_key_mow) {
     _rowset_writer = rowset_writer;
     _tablet_schema = tablet_schema;
     _unique_key_mow = unique_key_mow;
@@ -77,19 +77,9 @@ Status MemTableWriter::init(std::shared_ptr<RowsetWriter> 
rowset_writer,
     // create flush handler
     // by assigning segment_id to memtable before submiting to flush executor,
     // we can make sure same keys sort in the same order in all replicas.
-    if (wg_flush_pool_ptr) {
-        RETURN_IF_ERROR(
-                ExecEnv::GetInstance()
-                        ->storage_engine()
-                        .memtable_flush_executor()
-                        ->create_flush_token(_flush_token, _rowset_writer, 
wg_flush_pool_ptr));
-    } else {
-        RETURN_IF_ERROR(
-                ExecEnv::GetInstance()
-                        ->storage_engine()
-                        .memtable_flush_executor()
-                        ->create_flush_token(_flush_token, _rowset_writer, 
_req.is_high_priority));
-    }
+    RETURN_IF_ERROR(
+            
ExecEnv::GetInstance()->storage_engine().memtable_flush_executor()->create_flush_token(
+                    _flush_token, _rowset_writer, _req.is_high_priority, 
wg_sptr));
 
     _is_init = true;
     return Status::OK();
diff --git a/be/src/olap/memtable_writer.h b/be/src/olap/memtable_writer.h
index ec44348b4a9..fb07e740fa3 100644
--- a/be/src/olap/memtable_writer.h
+++ b/be/src/olap/memtable_writer.h
@@ -52,6 +52,7 @@ class SlotDescriptor;
 class OlapTableSchemaParam;
 class RowsetWriter;
 struct FlushStatistic;
+class WorkloadGroup;
 
 namespace vectorized {
 class Block;
@@ -67,7 +68,7 @@ public:
 
     Status init(std::shared_ptr<RowsetWriter> rowset_writer, TabletSchemaSPtr 
tablet_schema,
                 std::shared_ptr<PartialUpdateInfo> partial_update_info,
-                ThreadPool* wg_flush_pool_ptr, bool unique_key_mow = false);
+                std::shared_ptr<WorkloadGroup> wg_sptr, bool unique_key_mow = 
false);
 
     Status write(const vectorized::Block* block, const std::vector<uint32_t>& 
row_idxs);
 
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 9d499f3487e..1a05b784d5b 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -255,7 +255,7 @@ private:
     // And will be shared by all instances of this query.
     // So that we can control the max thread that a query can be used to 
execute.
     // If this token is not set, the scanner will be executed in 
"_scan_thread_pool" in exec env.
-    std::unique_ptr<ThreadPoolToken> _thread_token;
+    std::unique_ptr<ThreadPoolToken> _thread_token {nullptr};
 
     void _init_query_mem_tracker();
 
diff --git a/be/src/runtime/workload_group/workload_group.h 
b/be/src/runtime/workload_group/workload_group.h
index 933c5afdb4e..2ba84ce982b 100644
--- a/be/src/runtime/workload_group/workload_group.h
+++ b/be/src/runtime/workload_group/workload_group.h
@@ -198,6 +198,17 @@ public:
     }
     int64_t get_remote_scan_bytes_per_second();
 
+    CgroupCpuCtl* get_cgroup_cpu_ctl_ptr() {
+        std::shared_lock<std::shared_mutex> rlock(_task_sched_lock);
+        return _cgroup_cpu_ctl.get();
+    }
+
+    ThreadPool* get_memtable_flush_pool_ptr() {
+        // no lock here because this is called by memtable flush,
+        // to avoid lock competition with the workload thread pool's update
+        return _memtable_flush_pool.get();
+    }
+
 private:
     mutable std::shared_mutex _mutex; // lock _name, _version, _cpu_share, 
_memory_limit
     const uint64_t _id;
diff --git a/be/src/vec/sink/writer/async_result_writer.cpp 
b/be/src/vec/sink/writer/async_result_writer.cpp
index 16dcbc648fb..432ec1c54b5 100644
--- a/be/src/vec/sink/writer/async_result_writer.cpp
+++ b/be/src/vec/sink/writer/async_result_writer.cpp
@@ -107,6 +107,17 @@ void AsyncResultWriter::process_block(RuntimeState* state, 
RuntimeProfile* profi
         force_close(status);
     }
 
+    if (state && state->get_query_ctx()) {
+        WorkloadGroupPtr wg_ptr = state->get_query_ctx()->workload_group();
+        if (wg_ptr && wg_ptr->get_cgroup_cpu_ctl_ptr()) {
+            Status ret = 
wg_ptr->get_cgroup_cpu_ctl_ptr()->add_thread_to_cgroup();
+            if (ret.ok()) {
+                std::string wg_tname = "asyc_wr_" + wg_ptr->name();
+                Thread::set_self_name(wg_tname);
+            }
+        }
+    }
+
     DCHECK(_dependency);
     if (_writer_status.ok()) {
         while (true) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to