This is an automated email from the ASF dual-hosted git repository.

zouxinyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 44d9aa43154 [opt](resource) step3: Remove WorkloadQueryInfo, replaced 
by ResourceContext (#47960)
44d9aa43154 is described below

commit 44d9aa43154b4c740a4d3667c75627eb5a256b71
Author: Xinyi Zou <zouxi...@selectdb.com>
AuthorDate: Mon Feb 17 18:55:19 2025 +0800

    [opt](resource) step3: Remove WorkloadQueryInfo, replaced by 
ResourceContext (#47960)
    
    ### What problem does this PR solve?
    
    Modify `WorkloadSchedPolicy`, `WorkloadSchedPolicyMgr`, `WorkloadAction`
---
 be/src/olap/delta_writer.cpp                       |  2 +-
 be/src/runtime/fragment_mgr.cpp                    | 12 +---
 be/src/runtime/fragment_mgr.h                      |  2 +-
 be/src/runtime/load_channel.cpp                    |  2 +-
 be/src/runtime/query_context.cpp                   |  2 +-
 be/src/runtime/query_context.h                     |  4 +-
 be/src/runtime/runtime_query_statistics_mgr.cpp    | 22 ------
 be/src/runtime/runtime_query_statistics_mgr.h      |  5 --
 be/src/runtime/thread_context.cpp                  |  3 +-
 be/src/runtime/thread_context.h                    | 84 +++++++++-------------
 be/src/runtime/workload_management/cpu_context.cpp |  5 +-
 .../workload_management/resource_context.cpp       |  6 +-
 .../runtime/workload_management/resource_context.h | 10 +--
 .../workload_management/workload_action.cpp        | 15 ++--
 .../runtime/workload_management/workload_action.h  | 29 ++++++--
 .../workload_management/workload_group_context.h   | 45 ------------
 .../workload_management/workload_query_info.h      | 37 ----------
 .../workload_management/workload_sched_policy.cpp  | 52 ++++++++++----
 .../workload_management/workload_sched_policy.h    | 10 +--
 .../workload_sched_policy_mgr.cpp                  | 26 ++++---
 .../runtime/memory/thread_mem_tracker_mgr_test.cpp | 12 ++--
 21 files changed, 149 insertions(+), 236 deletions(-)

diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 69c2256a5eb..992dac27ec8 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -106,7 +106,7 @@ Status BaseDeltaWriter::init() {
     auto* t_ctx = doris::thread_context(true);
     std::shared_ptr<WorkloadGroup> wg_sptr = nullptr;
     if (t_ctx && t_ctx->is_attach_task()) {
-        wg_sptr = 
t_ctx->resource_ctx()->workload_group_context()->workload_group();
+        wg_sptr = t_ctx->resource_ctx()->workload_group();
     }
     RETURN_IF_ERROR(_rowset_builder->init());
     RETURN_IF_ERROR(_memtable_writer->init(
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 4dfa966b20b..009dc34bb96 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -79,7 +79,6 @@
 #include "runtime/types.h"
 #include "runtime/workload_group/workload_group.h"
 #include "runtime/workload_group/workload_group_manager.h"
-#include "runtime/workload_management/workload_query_info.h"
 #include "service/backend_options.h"
 #include "util/brpc_client_cache.h"
 #include "util/debug_points.h"
@@ -1399,18 +1398,13 @@ Status FragmentMgr::merge_filter(const 
PMergeFilterRequest* request,
     return merge_status;
 }
 
-void FragmentMgr::get_runtime_query_info(std::vector<WorkloadQueryInfo>* 
query_info_list) {
+void FragmentMgr::get_runtime_query_info(
+        std::vector<std::weak_ptr<ResourceContext>>* _resource_ctx_list) {
     _query_ctx_map.apply(
             [&](phmap::flat_hash_map<TUniqueId, std::weak_ptr<QueryContext>>& 
map) -> Status {
                 for (auto iter = map.begin(); iter != map.end();) {
                     if (auto q_ctx = iter->second.lock()) {
-                        WorkloadQueryInfo workload_query_info;
-                        workload_query_info.query_id = print_id(iter->first);
-                        workload_query_info.tquery_id = iter->first;
-                        workload_query_info.wg_id = q_ctx->workload_group() == 
nullptr
-                                                            ? -1
-                                                            : 
q_ctx->workload_group()->id();
-                        query_info_list->push_back(workload_query_info);
+                        _resource_ctx_list->push_back(q_ctx->resource_ctx());
                         iter++;
                     } else {
                         iter = map.erase(iter);
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 78f117a15db..9e37b7811f0 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -176,7 +176,7 @@ public:
     std::string dump_pipeline_tasks(int64_t duration = 0);
     std::string dump_pipeline_tasks(TUniqueId& query_id);
 
-    void get_runtime_query_info(std::vector<WorkloadQueryInfo>* 
_query_info_list);
+    void get_runtime_query_info(std::vector<std::weak_ptr<ResourceContext>>* 
_resource_ctx_list);
 
     Status get_realtime_exec_status(const TUniqueId& query_id,
                                     TReportExecStatusParams* exec_status);
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index 99f41899215..ce10666d84c 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -62,7 +62,7 @@ LoadChannel::LoadChannel(const UniqueId& load_id, int64_t 
timeout_s, bool is_hig
             wg_ptr = 
ExecEnv::GetInstance()->workload_group_mgr()->get_group(wg_id);
             if (wg_ptr != nullptr) {
                 wg_ptr->add_mem_tracker_limiter(mem_tracker);
-                
_resource_ctx->workload_group_context()->set_workload_group(wg_ptr);
+                _resource_ctx->set_workload_group(wg_ptr);
             }
         }
     }
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index a6cccc22091..9334db576ff 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -351,7 +351,7 @@ ThreadPool* QueryContext::get_memtable_flush_pool() {
 }
 
 void QueryContext::set_workload_group(WorkloadGroupPtr& wg) {
-    _resource_ctx->workload_group_context()->set_workload_group(wg);
+    _resource_ctx->set_workload_group(wg);
     // Should add query first, then the workload group will not be deleted.
     // see task_group_manager::delete_workload_group_by_ids
     workload_group()->add_mem_tracker_limiter(query_mem_tracker());
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index c02cf97bf1b..f8515f762d3 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -260,9 +260,7 @@ public:
 
     bool is_nereids() const { return _is_nereids; }
 
-    WorkloadGroupPtr workload_group() const {
-        return _resource_ctx->workload_group_context()->workload_group();
-    }
+    WorkloadGroupPtr workload_group() const { return 
_resource_ctx->workload_group(); }
     std::shared_ptr<MemTrackerLimiter> query_mem_tracker() const {
         return _resource_ctx->memory_context()->mem_tracker();
     }
diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp 
b/be/src/runtime/runtime_query_statistics_mgr.cpp
index f09558f7b9c..aa1e8546e02 100644
--- a/be/src/runtime/runtime_query_statistics_mgr.cpp
+++ b/be/src/runtime/runtime_query_statistics_mgr.cpp
@@ -453,28 +453,6 @@ void 
RuntimeQueryStatisticsMgr::report_runtime_query_statistics() {
     }
 }
 
-void RuntimeQueryStatisticsMgr::get_metric_map(
-        std::string query_id, std::map<WorkloadMetricType, std::string>& 
metric_map) {
-    std::shared_lock<std::shared_mutex> read_lock(_resource_contexts_map_lock);
-    if (_resource_contexts_map.find(query_id) != _resource_contexts_map.end()) 
{
-        auto* resource_ctx = _resource_contexts_map.at(query_id).get();
-        metric_map.emplace(
-                WorkloadMetricType::QUERY_TIME,
-                std::to_string(MonotonicMillis() - 
resource_ctx->task_controller()->finish_time()));
-        metric_map.emplace(WorkloadMetricType::SCAN_ROWS,
-                           
std::to_string(resource_ctx->io_context()->scan_rows()));
-        metric_map.emplace(WorkloadMetricType::SCAN_BYTES,
-                           
std::to_string(resource_ctx->io_context()->scan_bytes()));
-        metric_map.emplace(WorkloadMetricType::QUERY_MEMORY_BYTES,
-                           
std::to_string(resource_ctx->memory_context()->current_memory_bytes()));
-    } else {
-        metric_map.emplace(WorkloadMetricType::QUERY_TIME, "-1");
-        metric_map.emplace(WorkloadMetricType::SCAN_ROWS, "-1");
-        metric_map.emplace(WorkloadMetricType::SCAN_BYTES, "-1");
-        metric_map.emplace(WorkloadMetricType::QUERY_MEMORY_BYTES, "-1");
-    }
-}
-
 void RuntimeQueryStatisticsMgr::get_active_be_tasks_block(vectorized::Block* 
block) {
     std::shared_lock<std::shared_mutex> read_lock(_resource_contexts_map_lock);
     int64_t be_id = ExecEnv::GetInstance()->cluster_info()->backend_id;
diff --git a/be/src/runtime/runtime_query_statistics_mgr.h 
b/be/src/runtime/runtime_query_statistics_mgr.h
index 71a93ee9220..0bcdd647373 100644
--- a/be/src/runtime/runtime_query_statistics_mgr.h
+++ b/be/src/runtime/runtime_query_statistics_mgr.h
@@ -56,11 +56,6 @@ public:
 
     void report_runtime_query_statistics();
 
-    // used for workload scheduler policy
-    // TODO: save ResourceContext in WorkloadGroupMgr, put get_metric_map into 
WorkloadGroupMgr.
-    void get_metric_map(std::string query_id,
-                        std::map<WorkloadMetricType, std::string>& metric_map);
-
     // used for backend_active_tasks
     void get_active_be_tasks_block(vectorized::Block* block);
 
diff --git a/be/src/runtime/thread_context.cpp 
b/be/src/runtime/thread_context.cpp
index 09394c8a721..266515ae2a6 100644
--- a/be/src/runtime/thread_context.cpp
+++ b/be/src/runtime/thread_context.cpp
@@ -64,8 +64,7 @@ SwitchResourceContext::SwitchResourceContext(const 
std::shared_ptr<ResourceConte
         old_resource_ctx_ = thread_context()->resource_ctx();
         thread_context()->resource_ctx_ = rc;
         thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(
-                rc->memory_context()->mem_tracker(),
-                rc->workload_group_context()->workload_group());
+                rc->memory_context()->mem_tracker(), rc->workload_group());
     }
 }
 
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index 05cef5d5f9b..fbdbe397d40 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -111,54 +111,41 @@
         __VA_ARGS__;                                                           
         \
     } while (0)
 
-#define LIMIT_LOCAL_SCAN_IO(data_dir, bytes_read)                              
         \
-    std::shared_ptr<IOThrottle> iot = nullptr;                                 
         \
-    auto* t_ctx = doris::thread_context(true);                                 
         \
-    if (t_ctx && t_ctx->is_attach_task() &&                                    
         \
-        t_ctx->resource_ctx()->workload_group_context()->workload_group() != 
nullptr) { \
-        iot = t_ctx->resource_ctx()                                            
         \
-                      ->workload_group_context()                               
         \
-                      ->workload_group()                                       
         \
-                      ->get_local_scan_io_throttle(data_dir);                  
         \
-    }                                                                          
         \
-    if (iot) {                                                                 
         \
-        iot->acquire(-1);                                                      
         \
-    }                                                                          
         \
-    Defer defer {                                                              
         \
-        [&]() {                                                                
         \
-            if (iot) {                                                         
         \
-                iot->update_next_io_time(*bytes_read);                         
         \
-                t_ctx->resource_ctx()                                          
         \
-                        ->workload_group_context()                             
         \
-                        ->workload_group()                                     
         \
-                        ->update_local_scan_io(data_dir, *bytes_read);         
         \
-            }                                                                  
         \
-        }                                                                      
         \
+#define LIMIT_LOCAL_SCAN_IO(data_dir, bytes_read)                              
                   \
+    std::shared_ptr<IOThrottle> iot = nullptr;                                 
                   \
+    auto* t_ctx = doris::thread_context(true);                                 
                   \
+    if (t_ctx && t_ctx->is_attach_task() && 
t_ctx->resource_ctx()->workload_group() != nullptr) { \
+        iot = 
t_ctx->resource_ctx()->workload_group()->get_local_scan_io_throttle(data_dir);  
    \
+    }                                                                          
                   \
+    if (iot) {                                                                 
                   \
+        iot->acquire(-1);                                                      
                   \
+    }                                                                          
                   \
+    Defer defer {                                                              
                   \
+        [&]() {                                                                
                   \
+            if (iot) {                                                         
                   \
+                iot->update_next_io_time(*bytes_read);                         
                   \
+                
t_ctx->resource_ctx()->workload_group()->update_local_scan_io(data_dir,         
  \
+                                                                              
*bytes_read);       \
+            }                                                                  
                   \
+        }                                                                      
                   \
     }
 
-#define LIMIT_REMOTE_SCAN_IO(bytes_read)                                       
         \
-    std::shared_ptr<IOThrottle> iot = nullptr;                                 
         \
-    auto* t_ctx = doris::thread_context(true);                                 
         \
-    if (t_ctx && t_ctx->is_attach_task() &&                                    
         \
-        t_ctx->resource_ctx()->workload_group_context()->workload_group() != 
nullptr) { \
-        iot = t_ctx->resource_ctx()                                            
         \
-                      ->workload_group_context()                               
         \
-                      ->workload_group()                                       
         \
-                      ->get_remote_scan_io_throttle();                         
         \
-    }                                                                          
         \
-    if (iot) {                                                                 
         \
-        iot->acquire(-1);                                                      
         \
-    }                                                                          
         \
-    Defer defer {                                                              
         \
-        [&]() {                                                                
         \
-            if (iot) {                                                         
         \
-                iot->update_next_io_time(*bytes_read);                         
         \
-                t_ctx->resource_ctx()                                          
         \
-                        ->workload_group_context()                             
         \
-                        ->workload_group()                                     
         \
-                        ->update_remote_scan_io(*bytes_read);                  
         \
-            }                                                                  
         \
-        }                                                                      
         \
+#define LIMIT_REMOTE_SCAN_IO(bytes_read)                                       
                   \
+    std::shared_ptr<IOThrottle> iot = nullptr;                                 
                   \
+    auto* t_ctx = doris::thread_context(true);                                 
                   \
+    if (t_ctx && t_ctx->is_attach_task() && 
t_ctx->resource_ctx()->workload_group() != nullptr) { \
+        iot = 
t_ctx->resource_ctx()->workload_group()->get_remote_scan_io_throttle();         
    \
+    }                                                                          
                   \
+    if (iot) {                                                                 
                   \
+        iot->acquire(-1);                                                      
                   \
+    }                                                                          
                   \
+    Defer defer {                                                              
                   \
+        [&]() {                                                                
                   \
+            if (iot) {                                                         
                   \
+                iot->update_next_io_time(*bytes_read);                         
                   \
+                
t_ctx->resource_ctx()->workload_group()->update_remote_scan_io(*bytes_read);    
  \
+            }                                                                  
                   \
+        }                                                                      
                   \
     }
 
 namespace doris {
@@ -195,9 +182,8 @@ public:
         // will only attach_task at the beginning of the thread function, 
there should be no duplicate attach_task.
         DCHECK(resource_ctx_ == nullptr);
         resource_ctx_ = rc;
-        thread_mem_tracker_mgr->attach_limiter_tracker(
-                rc->memory_context()->mem_tracker(),
-                rc->workload_group_context()->workload_group());
+        
thread_mem_tracker_mgr->attach_limiter_tracker(rc->memory_context()->mem_tracker(),
+                                                       rc->workload_group());
         thread_mem_tracker_mgr->enable_wait_gc();
     }
 
diff --git a/be/src/runtime/workload_management/cpu_context.cpp 
b/be/src/runtime/workload_management/cpu_context.cpp
index b6bbcc0da8a..ee7a0024b0c 100644
--- a/be/src/runtime/workload_management/cpu_context.cpp
+++ b/be/src/runtime/workload_management/cpu_context.cpp
@@ -25,9 +25,8 @@ namespace doris {
 
 void CPUContext::update_cpu_cost_ms(int64_t delta) const {
     stats_.cpu_cost_ms_counter_->update(delta);
-    if (resource_ctx_ != nullptr &&
-        resource_ctx_->workload_group_context()->workload_group() != nullptr) {
-        
resource_ctx_->workload_group_context()->workload_group()->update_cpu_time(delta);
+    if (resource_ctx_ != nullptr && resource_ctx_->workload_group() != 
nullptr) {
+        resource_ctx_->workload_group()->update_cpu_time(delta);
     }
 }
 
diff --git a/be/src/runtime/workload_management/resource_context.cpp 
b/be/src/runtime/workload_management/resource_context.cpp
index 765e4615505..f78ac56e9f4 100644
--- a/be/src/runtime/workload_management/resource_context.cpp
+++ b/be/src/runtime/workload_management/resource_context.cpp
@@ -48,7 +48,11 @@ void 
ResourceContext::to_thrift_query_statistics(TQueryStatistics* statistics) c
     statistics->__set_scan_bytes_from_remote_storage(
             io_context()->scan_bytes_from_remote_storage());
     
statistics->__set_scan_bytes_from_local_storage(io_context()->scan_bytes_from_local_storage());
-    
statistics->__set_workload_group_id(workload_group_context()->workload_group_id());
+    if (workload_group() != nullptr) {
+        statistics->__set_workload_group_id(workload_group()->id());
+    } else {
+        statistics->__set_workload_group_id(-1);
+    }
 }
 
 } // namespace doris
diff --git a/be/src/runtime/workload_management/resource_context.h 
b/be/src/runtime/workload_management/resource_context.h
index 0c163f515cb..64d59289db8 100644
--- a/be/src/runtime/workload_management/resource_context.h
+++ b/be/src/runtime/workload_management/resource_context.h
@@ -28,7 +28,6 @@
 #include "runtime/workload_management/io_context.h"
 #include "runtime/workload_management/memory_context.h"
 #include "runtime/workload_management/task_controller.h"
-#include "runtime/workload_management/workload_group_context.h"
 #include "util/runtime_profile.h"
 
 namespace doris {
@@ -46,7 +45,6 @@ public:
         cpu_context_ = CPUContext::create_unique();
         memory_context_ = MemoryContext::create_unique();
         io_context_ = IOContext::create_unique();
-        workload_group_context_ = WorkloadGroupContext::create_unique();
         task_controller_ = TaskController::create_unique();
 
         cpu_context_->set_resource_ctx(this);
@@ -59,8 +57,8 @@ public:
     CPUContext* cpu_context() const { return cpu_context_.get(); }
     MemoryContext* memory_context() const { return memory_context_.get(); }
     IOContext* io_context() const { return io_context_.get(); }
-    WorkloadGroupContext* workload_group_context() const { return 
workload_group_context_.get(); }
     TaskController* task_controller() const { return task_controller_.get(); }
+    WorkloadGroupPtr workload_group() const { return _workload_group; }
 
     void set_cpu_context(std::unique_ptr<CPUContext> cpu_context) {
         cpu_context_ = std::move(cpu_context);
@@ -74,12 +72,10 @@ public:
         io_context_ = std::move(io_context);
         io_context_->set_resource_ctx(this);
     }
-    void set_workload_group_context(std::unique_ptr<WorkloadGroupContext> 
wg_context) {
-        workload_group_context_ = std::move(wg_context);
-    }
     void set_task_controller(std::unique_ptr<TaskController> task_controller) {
         task_controller_ = std::move(task_controller);
     }
+    void set_workload_group(WorkloadGroupPtr wg) { _workload_group = wg; }
 
     RuntimeProfile* profile() { return 
const_cast<RuntimeProfile*>(resource_profile_.get().get()); }
 
@@ -109,9 +105,9 @@ private:
     std::unique_ptr<CPUContext> cpu_context_ = nullptr;
     std::unique_ptr<MemoryContext> memory_context_ = nullptr;
     std::unique_ptr<IOContext> io_context_ = nullptr;
-    std::unique_ptr<WorkloadGroupContext> workload_group_context_ = nullptr;
     std::unique_ptr<TaskController> task_controller_ = nullptr;
 
+    WorkloadGroupPtr _workload_group = nullptr;
     MultiVersion<RuntimeProfile> resource_profile_;
 };
 
diff --git a/be/src/runtime/workload_management/workload_action.cpp 
b/be/src/runtime/workload_management/workload_action.cpp
index 77042b074fd..f9e42b43afa 100644
--- a/be/src/runtime/workload_management/workload_action.cpp
+++ b/be/src/runtime/workload_management/workload_action.cpp
@@ -21,18 +21,19 @@
 
 namespace doris {
 
-void WorkloadActionCancelQuery::exec(WorkloadQueryInfo* query_info) {
+void WorkloadActionCancelQuery::exec(WorkloadAction::RuntimeContext* 
action_runtime_ctx) {
     std::stringstream msg;
-    msg << "query " << query_info->query_id
-        << " cancelled by workload policy: " << query_info->policy_name
-        << ", id:" << query_info->policy_id << ", " << 
query_info->cond_eval_msg;
+    msg << "query " << 
print_id(action_runtime_ctx->resource_ctx->task_controller()->task_id())
+        << " cancelled by workload policy: " << action_runtime_ctx->policy_name
+        << ", id:" << action_runtime_ctx->policy_id << ", " << 
action_runtime_ctx->cond_eval_msg;
     std::string msg_str = msg.str();
     LOG(INFO) << "[workload_schedule]" << msg_str;
-    ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_info->tquery_id,
-                                                         
Status::InternalError<false>(msg_str));
+    ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
+            action_runtime_ctx->resource_ctx->task_controller()->task_id(),
+            Status::InternalError<false>(msg_str));
 }
 
-void WorkloadActionMoveQuery::exec(WorkloadQueryInfo* query_info) {
+void WorkloadActionMoveQuery::exec(WorkloadAction::RuntimeContext* 
action_runtime_ctx) {
     LOG(INFO) << "[workload_schedule]move query action run group=" << _wg_name;
 };
 
diff --git a/be/src/runtime/workload_management/workload_action.h 
b/be/src/runtime/workload_management/workload_action.h
index 785acc73c3a..4943b13a5d5 100644
--- a/be/src/runtime/workload_management/workload_action.h
+++ b/be/src/runtime/workload_management/workload_action.h
@@ -17,36 +17,53 @@
 
 #pragma once
 
-#include "runtime/workload_management/workload_query_info.h"
+#include <gen_cpp/BackendService_types.h>
+#include <glog/logging.h>
 
 namespace doris {
 
+class ResourceContext;
+
 enum WorkloadActionType { MOVE_QUERY_TO_GROUP = 0, CANCEL_QUERY = 1 };
 
 class WorkloadAction {
 public:
+    // only used as a temporary variable in 
`WorkloadSchedPolicyMgr::_schedule_workload`.
+    struct RuntimeContext {
+    public:
+        RuntimeContext(const std::shared_ptr<ResourceContext>& ctx) : 
resource_ctx(ctx) {}
+
+        int64_t policy_id;
+        std::string policy_name;
+        std::string cond_eval_msg;
+
+        std::shared_ptr<ResourceContext> resource_ctx;
+    };
+
     WorkloadAction() = default;
     virtual ~WorkloadAction() = default;
 
-    virtual void exec(WorkloadQueryInfo* query_info) = 0;
+    virtual void exec(WorkloadAction::RuntimeContext* action_runtime_ctx) = 0;
 
     virtual WorkloadActionType get_action_type() = 0;
 };
 
 class WorkloadActionCancelQuery : public WorkloadAction {
 public:
-    void exec(WorkloadQueryInfo* query_info) override;
+    void exec(WorkloadAction::RuntimeContext* action_runtime_ctx) override;
 
-    WorkloadActionType get_action_type() override { return CANCEL_QUERY; }
+    WorkloadActionType get_action_type() override { return 
WorkloadActionType::CANCEL_QUERY; }
 };
 
 //todo(wb) implement it
 class WorkloadActionMoveQuery : public WorkloadAction {
 public:
     WorkloadActionMoveQuery(std::string wg_name) : _wg_name(wg_name) {}
-    void exec(WorkloadQueryInfo* query_info) override;
+    void exec(WorkloadAction::RuntimeContext* action_runtime_ctx) override;
 
-    WorkloadActionType get_action_type() override { return 
MOVE_QUERY_TO_GROUP; }
+    WorkloadActionType get_action_type() override {
+        return WorkloadActionType::MOVE_QUERY_TO_GROUP;
+    }
 
 private:
     std::string _wg_name;
diff --git a/be/src/runtime/workload_management/workload_group_context.h 
b/be/src/runtime/workload_management/workload_group_context.h
deleted file mode 100644
index 38cad310925..00000000000
--- a/be/src/runtime/workload_management/workload_group_context.h
+++ /dev/null
@@ -1,45 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include "common/factory_creator.h"
-#include "runtime/workload_group/workload_group.h"
-
-namespace doris {
-
-class WorkloadGroupContext {
-    ENABLE_FACTORY_CREATOR(WorkloadGroupContext);
-
-public:
-    WorkloadGroupContext() = default;
-    virtual ~WorkloadGroupContext() = default;
-
-    int64_t workload_group_id() {
-        if (workload_group() != nullptr) {
-            return workload_group()->id();
-        }
-        return -1;
-    }
-    WorkloadGroupPtr workload_group() { return _workload_group; }
-    void set_workload_group(WorkloadGroupPtr wg) { _workload_group = wg; }
-
-protected:
-    WorkloadGroupPtr _workload_group = nullptr;
-};
-
-} // namespace doris
diff --git a/be/src/runtime/workload_management/workload_query_info.h 
b/be/src/runtime/workload_management/workload_query_info.h
deleted file mode 100644
index 16151eec390..00000000000
--- a/be/src/runtime/workload_management/workload_query_info.h
+++ /dev/null
@@ -1,37 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <map>
-
-#include "runtime/workload_management/workload_condition.h"
-
-namespace doris {
-
-class WorkloadQueryInfo {
-public:
-    std::map<WorkloadMetricType, std::string> metric_map;
-    TUniqueId tquery_id;
-    std::string query_id;
-    int64_t wg_id;
-    int64_t policy_id;
-    std::string policy_name {""};
-    std::string cond_eval_msg {""};
-};
-
-} // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime/workload_management/workload_sched_policy.cpp 
b/be/src/runtime/workload_management/workload_sched_policy.cpp
index 63b9362bc21..597daafa6bc 100644
--- a/be/src/runtime/workload_management/workload_sched_policy.cpp
+++ b/be/src/runtime/workload_management/workload_sched_policy.cpp
@@ -17,6 +17,9 @@
 
 #include "runtime/workload_management/workload_sched_policy.h"
 
+#include "runtime/workload_management/resource_context.h"
+#include "util/time.h"
+
 namespace doris {
 
 void WorkloadSchedPolicy::init(int64_t id, std::string name, int version, bool 
enabled,
@@ -47,26 +50,47 @@ void WorkloadSchedPolicy::init(int64_t id, std::string 
name, int version, bool e
     }
 }
 
-bool WorkloadSchedPolicy::is_match(WorkloadQueryInfo* query_info_ptr) {
+bool WorkloadSchedPolicy::is_match(WorkloadAction::RuntimeContext* 
action_runtime_ctx) const {
     if (!_enabled) {
         return false;
     }
 
     // 1 when policy has no group(_wg_id_set.size() < 0), it should match all 
query
     // 2 when policy has group, it can only match the query which has the same 
group
-    if (_wg_id_set.size() > 0 && (query_info_ptr->wg_id <= 0 ||
-                                  _wg_id_set.find(query_info_ptr->wg_id) == 
_wg_id_set.end())) {
+    if (!_wg_id_set.empty() &&
+        (action_runtime_ctx->resource_ctx->workload_group() == nullptr ||
+         
_wg_id_set.find(action_runtime_ctx->resource_ctx->workload_group()->id()) ==
+                 _wg_id_set.end())) {
         return false;
     }
 
-    auto& metric_val_map = query_info_ptr->metric_map;
-    std::string cond_eval_msg = "";
-    for (auto& cond : _condition_list) {
-        if (metric_val_map.find(cond->get_workload_metric_type()) == 
metric_val_map.end()) {
+    std::string cond_eval_msg;
+    for (const auto& cond : _condition_list) {
+        std::string val;
+        switch (cond->get_workload_metric_type()) {
+        case WorkloadMetricType::QUERY_TIME: {
+            val = std::to_string(
+                    MonotonicMillis() -
+                    
action_runtime_ctx->resource_ctx->task_controller()->finish_time());
+            break;
+        }
+        case WorkloadMetricType::SCAN_BYTES: {
+            val = 
std::to_string(action_runtime_ctx->resource_ctx->io_context()->scan_bytes());
+            break;
+        }
+        case WorkloadMetricType::SCAN_ROWS: {
+            val = 
std::to_string(action_runtime_ctx->resource_ctx->io_context()->scan_rows());
+            break;
+        }
+        case WorkloadMetricType::QUERY_MEMORY_BYTES: {
+            val = std::to_string(
+                    
action_runtime_ctx->resource_ctx->memory_context()->current_memory_bytes());
+            break;
+        }
+        default:
             return false;
         }
 
-        std::string val = metric_val_map.at(cond->get_workload_metric_type());
         if (!cond->eval(val)) {
             return false;
         }
@@ -74,15 +98,15 @@ bool WorkloadSchedPolicy::is_match(WorkloadQueryInfo* 
query_info_ptr) {
                          cond->get_metric_value_string() + "), ";
     }
     cond_eval_msg = cond_eval_msg.substr(0, cond_eval_msg.size() - 2);
-    query_info_ptr->cond_eval_msg = cond_eval_msg;
+    action_runtime_ctx->cond_eval_msg = cond_eval_msg;
     return true;
 }
 
-void WorkloadSchedPolicy::exec_action(WorkloadQueryInfo* query_info) {
-    for (int i = 0; i < _action_list.size(); i++) {
-        query_info->policy_id = this->_id;
-        query_info->policy_name = this->_name;
-        _action_list[i]->exec(query_info);
+void WorkloadSchedPolicy::exec_action(WorkloadAction::RuntimeContext* 
action_runtime_ctx) {
+    for (auto& action : _action_list) {
+        action_runtime_ctx->policy_id = this->_id;
+        action_runtime_ctx->policy_name = this->_name;
+        action->exec(action_runtime_ctx);
     }
 }
 
diff --git a/be/src/runtime/workload_management/workload_sched_policy.h 
b/be/src/runtime/workload_management/workload_sched_policy.h
index 6554634d9af..ce1e4b6655e 100644
--- a/be/src/runtime/workload_management/workload_sched_policy.h
+++ b/be/src/runtime/workload_management/workload_sched_policy.h
@@ -34,16 +34,16 @@ public:
               std::vector<std::unique_ptr<WorkloadCondition>> condition_list,
               std::vector<std::unique_ptr<WorkloadAction>> action_list);
 
-    bool enabled() { return _enabled; }
-    int priority() { return _priority; }
+    bool enabled() const { return _enabled; }
+    int priority() const { return _priority; }
 
-    bool is_match(WorkloadQueryInfo* query_info);
+    bool is_match(WorkloadAction::RuntimeContext* action_runtime_ctx) const;
 
     WorkloadActionType get_first_action_type() { return _first_action_type; }
 
-    void exec_action(WorkloadQueryInfo* query_info);
+    void exec_action(WorkloadAction::RuntimeContext* action_runtime_ctx);
 
-    int version() { return _version; }
+    int version() const { return _version; }
 
 private:
     int64_t _id;
diff --git a/be/src/runtime/workload_management/workload_sched_policy_mgr.cpp 
b/be/src/runtime/workload_management/workload_sched_policy_mgr.cpp
index 4690ed1d4f2..4802704c136 100644
--- a/be/src/runtime/workload_management/workload_sched_policy_mgr.cpp
+++ b/be/src/runtime/workload_management/workload_sched_policy_mgr.cpp
@@ -17,8 +17,10 @@
 
 #include "runtime/workload_management/workload_sched_policy_mgr.h"
 
+#include <memory>
+
 #include "runtime/fragment_mgr.h"
-#include "runtime/runtime_query_statistics_mgr.h"
+#include "runtime/workload_management/resource_context.h"
 
 namespace doris {
 
@@ -75,18 +77,20 @@ void WorkloadSchedPolicyMgr::update_workload_sched_policy(
 
 void WorkloadSchedPolicyMgr::_schedule_workload() {
     while (!_stop_latch.wait_for(std::chrono::milliseconds(500))) {
-        // 1 get query info
-        std::vector<WorkloadQueryInfo> list;
+        // 1 get query resource context
+        std::vector<std::weak_ptr<ResourceContext>> list;
         _exec_env->fragment_mgr()->get_runtime_query_info(&list);
         // todo: add timer
-        if (list.size() == 0) {
+        if (list.empty()) {
             continue;
         }
 
-        for (int i = 0; i < list.size(); i++) {
-            WorkloadQueryInfo* query_info_ptr = &(list[i]);
-            
_exec_env->runtime_query_statistics_mgr()->get_metric_map(query_info_ptr->query_id,
-                                                                      
query_info_ptr->metric_map);
+        for (const auto& i : list) {
+            auto resource_ctx = i.lock();
+            if (resource_ctx == nullptr) {
+                continue;
+            }
+            WorkloadAction::RuntimeContext action_runtime_ctx(resource_ctx);
 
             // 2 get matched policy
             std::map<WorkloadActionType, std::shared_ptr<WorkloadSchedPolicy>> 
matched_policy_map;
@@ -94,7 +98,7 @@ void WorkloadSchedPolicyMgr::_schedule_workload() {
                 std::shared_lock<std::shared_mutex> read_lock(_policy_lock);
                 for (auto& entity : _id_policy_map) {
                     auto& new_policy = entity.second;
-                    if (new_policy->is_match(query_info_ptr)) {
+                    if (new_policy->is_match(&action_runtime_ctx)) {
                         WorkloadActionType new_policy_type = 
new_policy->get_first_action_type();
                         if (matched_policy_map.find(new_policy_type) == 
matched_policy_map.end() ||
                             new_policy->priority() >
@@ -105,7 +109,7 @@ void WorkloadSchedPolicyMgr::_schedule_workload() {
                 }
             }
 
-            if (matched_policy_map.size() == 0) {
+            if (matched_policy_map.empty()) {
                 continue;
             }
             LOG(INFO) << "[workload_schedule] matched policy size=" << 
matched_policy_map.size();
@@ -128,7 +132,7 @@ void WorkloadSchedPolicyMgr::_schedule_workload() {
 
             // 4 exec policy action
             for (const auto& [key, value] : matched_policy_map) {
-                value->exec_action(query_info_ptr);
+                value->exec_action(&action_runtime_ctx);
             }
         }
     }
diff --git a/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp 
b/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp
index 94e32f67810..2049bb29043 100644
--- a/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp
+++ b/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp
@@ -43,7 +43,7 @@ TEST_F(ThreadMemTrackerMgrTest, ConsumeMemory) {
             MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER, 
"UT-ConsumeMemory");
     std::shared_ptr<ResourceContext> rc = ResourceContext::create_shared();
     rc->memory_context()->set_mem_tracker(t);
-    rc->workload_group_context()->set_workload_group(workload_group);
+    rc->set_workload_group(workload_group);
 
     int64_t size1 = 4 * 1024;
     int64_t size2 = 4 * 1024 * 1024;
@@ -104,7 +104,7 @@ TEST_F(ThreadMemTrackerMgrTest, NestedSwitchMemTracker) {
             MemTrackerLimiter::Type::OTHER, "UT-NestedSwitchMemTracker3");
     std::shared_ptr<ResourceContext> rc = ResourceContext::create_shared();
     rc->memory_context()->set_mem_tracker(t1);
-    rc->workload_group_context()->set_workload_group(workload_group);
+    rc->set_workload_group(workload_group);
 
     int64_t size1 = 4 * 1024;
     int64_t size2 = 4 * 1024 * 1024;
@@ -177,7 +177,7 @@ TEST_F(ThreadMemTrackerMgrTest, MultiMemTracker) {
     std::shared_ptr<MemTracker> t3 = 
std::make_shared<MemTracker>("UT-MultiMemTracker3");
     std::shared_ptr<ResourceContext> rc = ResourceContext::create_shared();
     rc->memory_context()->set_mem_tracker(t1);
-    rc->workload_group_context()->set_workload_group(workload_group);
+    rc->set_workload_group(workload_group);
 
     int64_t size1 = 4 * 1024;
     int64_t size2 = 4 * 1024 * 1024;
@@ -239,7 +239,7 @@ TEST_F(ThreadMemTrackerMgrTest, ReserveMemory) {
             MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER, 
"UT-ReserveMemory");
     std::shared_ptr<ResourceContext> rc = ResourceContext::create_shared();
     rc->memory_context()->set_mem_tracker(t);
-    rc->workload_group_context()->set_workload_group(workload_group);
+    rc->set_workload_group(workload_group);
 
     int64_t size1 = 4 * 1024;
     int64_t size2 = 4 * 1024 * 1024;
@@ -337,7 +337,7 @@ TEST_F(ThreadMemTrackerMgrTest, NestedReserveMemory) {
             MemTrackerLimiter::Type::OTHER, "UT-NestedReserveMemory");
     std::shared_ptr<ResourceContext> rc = ResourceContext::create_shared();
     rc->memory_context()->set_mem_tracker(t);
-    rc->workload_group_context()->set_workload_group(workload_group);
+    rc->set_workload_group(workload_group);
 
     int64_t size2 = 4 * 1024 * 1024;
     int64_t size3 = size2 * 2;
@@ -396,7 +396,7 @@ TEST_F(ThreadMemTrackerMgrTest, 
NestedSwitchMemTrackerReserveMemory) {
             MemTrackerLimiter::Type::OTHER, 
"UT-NestedSwitchMemTrackerReserveMemory3");
     std::shared_ptr<ResourceContext> rc = ResourceContext::create_shared();
     rc->memory_context()->set_mem_tracker(t1);
-    rc->workload_group_context()->set_workload_group(workload_group);
+    rc->set_workload_group(workload_group);
 
     int64_t size1 = 4 * 1024;
     int64_t size2 = 4 * 1024 * 1024;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to