This is an automated email from the ASF dual-hosted git repository.

zouxinyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new eb90763a797 [opt](resource) Add `WorkloadSchedPolicyTest` (#48154)
eb90763a797 is described below

commit eb90763a7978bb6ab5f70ec3199174456f4cccb6
Author: Xinyi Zou <zouxi...@selectdb.com>
AuthorDate: Fri Feb 21 15:25:10 2025 +0800

    [opt](resource) Add `WorkloadSchedPolicyTest` (#48154)
    
    ### What problem does this PR solve?
    
    ```
    [==========] Running 4 tests from 1 test suite.
    [----------] Global test environment set-up.
    [----------] 4 tests from WorkloadSchedPolicyTest
    [ RUN      ] WorkloadSchedPolicyTest.one_policy_one_condition
    [       OK ] WorkloadSchedPolicyTest.one_policy_one_condition (51 ms)
    [ RUN      ] WorkloadSchedPolicyTest.one_policy_mutl_condition
    [       OK ] WorkloadSchedPolicyTest.one_policy_mutl_condition (50 ms)
    [ RUN      ] WorkloadSchedPolicyTest.test_operator
    [       OK ] WorkloadSchedPolicyTest.test_operator (0 ms)
    [ RUN      ] WorkloadSchedPolicyTest.test_wg_id_set
    [       OK ] WorkloadSchedPolicyTest.test_wg_id_set (0 ms)
    [----------] 4 tests from WorkloadSchedPolicyTest (103 ms total)
    
    [----------] Global test environment tear-down
    [==========] 4 tests from 1 test suite ran. (104 ms total)
    [  PASSED  ] 4 tests.
    === Finished. Gtest output: 
/mnt/disk2/zouxinyi/doris/core/be/ut_build_ASAN/gtest_output
    ```
---
 .../runtime/workload_management/memory_context.h   |   3 +-
 .../runtime/workload_management/task_controller.h  |  10 +-
 .../workload_management/workload_sched_policy.cpp  |  25 +-
 be/test/runtime/workload_sched_policy_test.cpp     | 286 +++++++++++++++++++++
 4 files changed, 306 insertions(+), 18 deletions(-)

diff --git a/be/src/runtime/workload_management/memory_context.h 
b/be/src/runtime/workload_management/memory_context.h
index 6caff41f026..e79f1375592 100644
--- a/be/src/runtime/workload_management/memory_context.h
+++ b/be/src/runtime/workload_management/memory_context.h
@@ -83,7 +83,8 @@ public:
 
     int64_t current_memory_bytes() const { return mem_tracker_->consumption(); 
}
     int64_t peak_memory_bytes() const { return 
mem_tracker_->peak_consumption(); }
-    int64_t max_peak_memory_bytes() const { return 
stats_.max_peak_memory_bytes_counter_->value(); }
+    // TODO, use stats_.max_peak_memory_bytes_counter_->value();
+    int64_t max_peak_memory_bytes() const { return 
mem_tracker_->peak_consumption(); }
     int64_t revoke_attempts() const { return 
stats_.revoke_attempts_counter_->value(); }
     int64_t revoke_wait_time_ms() const { return 
stats_.revoke_wait_time_ms_counter_->value(); }
     int64_t revoked_bytes() const { return 
stats_.revoked_bytes_counter_->value(); }
diff --git a/be/src/runtime/workload_management/task_controller.h 
b/be/src/runtime/workload_management/task_controller.h
index f88d7d50832..9fc86b142a0 100644
--- a/be/src/runtime/workload_management/task_controller.h
+++ b/be/src/runtime/workload_management/task_controller.h
@@ -30,7 +30,10 @@ class TaskController {
     ENABLE_FACTORY_CREATOR(TaskController);
 
 public:
-    TaskController() { task_id_ = TUniqueId(); };
+    TaskController() {
+        task_id_ = TUniqueId();
+        start_time_ = MonotonicMillis();
+    };
     virtual ~TaskController() = default;
 
     bool is_attach_task() { return task_id_ != TUniqueId(); }
@@ -51,10 +54,7 @@ public:
 
     int64_t start_time() const { return start_time_; }
     int64_t finish_time() const { return finish_time_; }
-    Status running_time(int64_t* running_time_msecs) const {
-        *running_time_msecs = finish_time_ - start_time_;
-        return Status::OK();
-    }
+    int64_t running_time() const { return finish_time_ - start_time_; }
     TNetworkAddress fe_addr() { return fe_addr_; }
     TQueryType::type query_type() { return query_type_; }
 
diff --git a/be/src/runtime/workload_management/workload_sched_policy.cpp 
b/be/src/runtime/workload_management/workload_sched_policy.cpp
index 597daafa6bc..91d75e2aae1 100644
--- a/be/src/runtime/workload_management/workload_sched_policy.cpp
+++ b/be/src/runtime/workload_management/workload_sched_policy.cpp
@@ -35,16 +35,18 @@ void WorkloadSchedPolicy::init(int64_t id, std::string 
name, int version, bool e
     _action_list = std::move(action_list);
     _wg_id_set = wg_id_set;
 
-    _first_action_type = _action_list[0]->get_action_type();
-    if (_first_action_type != WorkloadActionType::MOVE_QUERY_TO_GROUP &&
-        _first_action_type != WorkloadActionType::CANCEL_QUERY) {
-        for (int i = 1; i < _action_list.size(); i++) {
-            WorkloadActionType cur_action_type = 
_action_list[i]->get_action_type();
-            // one policy can not both contains move and cancel
-            if (cur_action_type == WorkloadActionType::MOVE_QUERY_TO_GROUP ||
-                cur_action_type == WorkloadActionType::CANCEL_QUERY) {
-                _first_action_type = cur_action_type;
-                break;
+    if (!_action_list.empty()) {
+        _first_action_type = _action_list[0]->get_action_type();
+        if (_first_action_type != WorkloadActionType::MOVE_QUERY_TO_GROUP &&
+            _first_action_type != WorkloadActionType::CANCEL_QUERY) {
+            for (int i = 1; i < _action_list.size(); i++) {
+                WorkloadActionType cur_action_type = 
_action_list[i]->get_action_type();
+                // one policy can not both contains move and cancel
+                if (cur_action_type == WorkloadActionType::MOVE_QUERY_TO_GROUP 
||
+                    cur_action_type == WorkloadActionType::CANCEL_QUERY) {
+                    _first_action_type = cur_action_type;
+                    break;
+                }
             }
         }
     }
@@ -70,8 +72,7 @@ bool 
WorkloadSchedPolicy::is_match(WorkloadAction::RuntimeContext* action_runtim
         switch (cond->get_workload_metric_type()) {
         case WorkloadMetricType::QUERY_TIME: {
             val = std::to_string(
-                    MonotonicMillis() -
-                    
action_runtime_ctx->resource_ctx->task_controller()->finish_time());
+                    
action_runtime_ctx->resource_ctx->task_controller()->running_time());
             break;
         }
         case WorkloadMetricType::SCAN_BYTES: {
diff --git a/be/test/runtime/workload_sched_policy_test.cpp 
b/be/test/runtime/workload_sched_policy_test.cpp
new file mode 100644
index 00000000000..e7cfea03695
--- /dev/null
+++ b/be/test/runtime/workload_sched_policy_test.cpp
@@ -0,0 +1,286 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/workload_management/workload_sched_policy.h"
+
+#include <gen_cpp/BackendService_types.h>
+#include <gtest/gtest.h>
+
+#include "pipeline/task_scheduler.h"
+#include "runtime/workload_group/workload_group.h"
+#include "runtime/workload_management/resource_context.h"
+#include "runtime/workload_management/workload_condition.h"
+#include "util/threadpool.h"
+#include "vec/exec/scan/scanner_scheduler.h"
+
+namespace doris {
+
+class WorkloadSchedPolicyTest : public testing::Test {
+public:
+    WorkloadSchedPolicyTest() = default;
+    ~WorkloadSchedPolicyTest() override = default;
+
+protected:
+    std::unique_ptr<WorkloadCondition> 
create_workload_condition(TWorkloadMetricType::type mtype,
+                                                                 
TCompareOperator::type otype,
+                                                                 std::string 
value) {
+        TWorkloadCondition cond;
+        cond.metric_name = mtype;
+        cond.op = otype;
+        cond.value = value;
+        return WorkloadConditionFactory::create_workload_condition(&cond);
+    }
+
+    std::unique_ptr<WorkloadAction> 
create_workload_action(TWorkloadActionType::type atype) {
+        TWorkloadAction action;
+        action.action = atype;
+        return WorkloadActionFactory::create_workload_action(&action);
+    }
+
+    WorkloadAction::RuntimeContext create_runtime_context() {
+        std::shared_ptr<ResourceContext> resource_ctx = 
ResourceContext::create_shared();
+        WorkloadAction::RuntimeContext action_runtime_ctx(resource_ctx);
+        return action_runtime_ctx;
+    }
+
+    void SetUp() override {}
+};
+
+TEST_F(WorkloadSchedPolicyTest, one_policy_one_condition) {
+    // 1 empty resource
+    {
+        std::shared_ptr<WorkloadSchedPolicy> policy = 
std::make_shared<WorkloadSchedPolicy>();
+        std::vector<std::unique_ptr<WorkloadCondition>> cond_ptr_list;
+        std::vector<std::unique_ptr<WorkloadAction>> action_ptr_list;
+        std::set<int64_t> wg_id_set;
+        policy->init(0, "p1", 0, true, 0, wg_id_set, std::move(cond_ptr_list),
+                     std::move(action_ptr_list));
+
+        WorkloadAction::RuntimeContext action_runtime_ctx = 
create_runtime_context();
+        EXPECT_TRUE(policy->is_match(&action_runtime_ctx));
+    }
+    {
+        std::shared_ptr<WorkloadSchedPolicy> policy = 
std::make_shared<WorkloadSchedPolicy>();
+        std::vector<std::unique_ptr<WorkloadCondition>> cond_ptr_list;
+        
cond_ptr_list.push_back(create_workload_condition(TWorkloadMetricType::type::QUERY_TIME,
+                                                          
TCompareOperator::type::GREATER, "10"));
+        std::vector<std::unique_ptr<WorkloadAction>> action_ptr_list;
+        
action_ptr_list.push_back(create_workload_action(TWorkloadActionType::type::CANCEL_QUERY));
+        std::set<int64_t> wg_id_set;
+        policy->init(0, "p1", 0, true, 0, wg_id_set, std::move(cond_ptr_list),
+                     std::move(action_ptr_list));
+
+        WorkloadAction::RuntimeContext action_runtime_ctx = 
create_runtime_context();
+        EXPECT_FALSE(policy->is_match(&action_runtime_ctx));
+    }
+
+    // 2 check query time
+    {
+        std::shared_ptr<WorkloadSchedPolicy> policy = 
std::make_shared<WorkloadSchedPolicy>();
+        std::vector<std::unique_ptr<WorkloadCondition>> cond_ptr_list;
+        
cond_ptr_list.push_back(create_workload_condition(TWorkloadMetricType::type::QUERY_TIME,
+                                                          
TCompareOperator::type::GREATER, "10"));
+        std::vector<std::unique_ptr<WorkloadAction>> action_ptr_list;
+        
action_ptr_list.push_back(create_workload_action(TWorkloadActionType::type::CANCEL_QUERY));
+        std::set<int64_t> wg_id_set;
+        policy->init(0, "p1", 0, true, 0, wg_id_set, std::move(cond_ptr_list),
+                     std::move(action_ptr_list));
+
+        WorkloadAction::RuntimeContext action_runtime_ctx = 
create_runtime_context();
+        std::this_thread::sleep_for(std::chrono::milliseconds(50));
+        action_runtime_ctx.resource_ctx->task_controller()->finish();
+        EXPECT_TRUE(policy->is_match(&action_runtime_ctx))
+                << ": " << 
action_runtime_ctx.resource_ctx->task_controller()->running_time();
+    }
+
+    // 3 check scan rows
+    {
+        std::shared_ptr<WorkloadSchedPolicy> policy = 
std::make_shared<WorkloadSchedPolicy>();
+        std::vector<std::unique_ptr<WorkloadCondition>> cond_ptr_list;
+        
cond_ptr_list.push_back(create_workload_condition(TWorkloadMetricType::type::BE_SCAN_ROWS,
+                                                          
TCompareOperator::type::GREATER, "100"));
+        std::vector<std::unique_ptr<WorkloadAction>> action_ptr_list;
+        
action_ptr_list.push_back(create_workload_action(TWorkloadActionType::type::CANCEL_QUERY));
+        std::set<int64_t> wg_id_set;
+        policy->init(0, "p1", 0, true, 0, wg_id_set, std::move(cond_ptr_list),
+                     std::move(action_ptr_list));
+
+        WorkloadAction::RuntimeContext action_runtime_ctx = 
create_runtime_context();
+        action_runtime_ctx.resource_ctx->io_context()->update_scan_rows(99);
+        EXPECT_FALSE(policy->is_match(&action_runtime_ctx))
+                << ": " << 
action_runtime_ctx.resource_ctx->io_context()->scan_rows();
+        action_runtime_ctx.resource_ctx->io_context()->update_scan_rows(2);
+        EXPECT_TRUE(policy->is_match(&action_runtime_ctx))
+                << ": " << 
action_runtime_ctx.resource_ctx->io_context()->scan_rows();
+        action_runtime_ctx.resource_ctx->io_context()->update_scan_rows(-2);
+        EXPECT_FALSE(policy->is_match(&action_runtime_ctx))
+                << ": " << 
action_runtime_ctx.resource_ctx->io_context()->scan_rows();
+    }
+
+    // 4 check scan bytes
+    {
+        std::shared_ptr<WorkloadSchedPolicy> policy = 
std::make_shared<WorkloadSchedPolicy>();
+        std::vector<std::unique_ptr<WorkloadCondition>> cond_ptr_list;
+        
cond_ptr_list.push_back(create_workload_condition(TWorkloadMetricType::type::BE_SCAN_BYTES,
+                                                          
TCompareOperator::type::GREATER, "1000"));
+        std::vector<std::unique_ptr<WorkloadAction>> action_ptr_list;
+        
action_ptr_list.push_back(create_workload_action(TWorkloadActionType::type::CANCEL_QUERY));
+        std::set<int64_t> wg_id_set;
+        policy->init(0, "p1", 0, true, 0, wg_id_set, std::move(cond_ptr_list),
+                     std::move(action_ptr_list));
+
+        WorkloadAction::RuntimeContext action_runtime_ctx = 
create_runtime_context();
+        action_runtime_ctx.resource_ctx->io_context()->update_scan_bytes(999);
+        EXPECT_FALSE(policy->is_match(&action_runtime_ctx))
+                << ": " << 
action_runtime_ctx.resource_ctx->io_context()->scan_bytes();
+        action_runtime_ctx.resource_ctx->io_context()->update_scan_bytes(2);
+        EXPECT_TRUE(policy->is_match(&action_runtime_ctx))
+                << ": " << 
action_runtime_ctx.resource_ctx->io_context()->scan_bytes();
+        action_runtime_ctx.resource_ctx->io_context()->update_scan_bytes(-2);
+        EXPECT_FALSE(policy->is_match(&action_runtime_ctx))
+                << ": " << 
action_runtime_ctx.resource_ctx->io_context()->scan_bytes();
+    }
+
+    // 5 check query be memory bytes
+    {
+        std::shared_ptr<WorkloadSchedPolicy> policy = 
std::make_shared<WorkloadSchedPolicy>();
+        std::vector<std::unique_ptr<WorkloadCondition>> cond_ptr_list;
+        cond_ptr_list.push_back(
+                
create_workload_condition(TWorkloadMetricType::type::QUERY_BE_MEMORY_BYTES,
+                                          TCompareOperator::type::GREATER, 
"10000"));
+        std::vector<std::unique_ptr<WorkloadAction>> action_ptr_list;
+        
action_ptr_list.push_back(create_workload_action(TWorkloadActionType::type::CANCEL_QUERY));
+        std::set<int64_t> wg_id_set;
+        policy->init(0, "p1", 0, true, 0, wg_id_set, std::move(cond_ptr_list),
+                     std::move(action_ptr_list));
+
+        WorkloadAction::RuntimeContext action_runtime_ctx = 
create_runtime_context();
+        std::shared_ptr<MemTrackerLimiter> mem_tracker =
+                
MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::QUERY, "Test");
+        
action_runtime_ctx.resource_ctx->memory_context()->set_mem_tracker(mem_tracker);
+        
action_runtime_ctx.resource_ctx->memory_context()->mem_tracker()->consume(9999);
+        EXPECT_FALSE(policy->is_match(&action_runtime_ctx))
+                << ": "
+                << 
action_runtime_ctx.resource_ctx->memory_context()->current_memory_bytes();
+        
action_runtime_ctx.resource_ctx->memory_context()->mem_tracker()->consume(2);
+        EXPECT_TRUE(policy->is_match(&action_runtime_ctx))
+                << ": "
+                << 
action_runtime_ctx.resource_ctx->memory_context()->current_memory_bytes();
+        
action_runtime_ctx.resource_ctx->memory_context()->mem_tracker()->consume(-2);
+        EXPECT_FALSE(policy->is_match(&action_runtime_ctx))
+                << ": "
+                << 
action_runtime_ctx.resource_ctx->memory_context()->current_memory_bytes();
+    }
+}
+
+TEST_F(WorkloadSchedPolicyTest, one_policy_mutl_condition) {
+    // 1. init policy
+    std::shared_ptr<WorkloadSchedPolicy> policy = 
std::make_shared<WorkloadSchedPolicy>();
+    std::vector<std::unique_ptr<WorkloadCondition>> cond_ptr_list;
+    
cond_ptr_list.push_back(create_workload_condition(TWorkloadMetricType::type::QUERY_TIME,
+                                                      
TCompareOperator::type::GREATER, "10"));
+    
cond_ptr_list.push_back(create_workload_condition(TWorkloadMetricType::type::BE_SCAN_ROWS,
+                                                      
TCompareOperator::type::GREATER, "100"));
+    
cond_ptr_list.push_back(create_workload_condition(TWorkloadMetricType::type::BE_SCAN_BYTES,
+                                                      
TCompareOperator::type::GREATER, "1000"));
+    cond_ptr_list.push_back(
+            
create_workload_condition(TWorkloadMetricType::type::QUERY_BE_MEMORY_BYTES,
+                                      TCompareOperator::type::GREATER, 
"10000"));
+    std::vector<std::unique_ptr<WorkloadAction>> action_ptr_list;
+    
action_ptr_list.push_back(create_workload_action(TWorkloadActionType::type::CANCEL_QUERY));
+    std::set<int64_t> wg_id_set;
+    policy->init(0, "p1", 0, true, 0, wg_id_set, std::move(cond_ptr_list),
+                 std::move(action_ptr_list));
+
+    // 2. is match
+    WorkloadAction::RuntimeContext action_runtime_ctx = 
create_runtime_context();
+    std::shared_ptr<MemTrackerLimiter> mem_tracker =
+            MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::QUERY, 
"Test");
+    
action_runtime_ctx.resource_ctx->memory_context()->set_mem_tracker(mem_tracker);
+    EXPECT_FALSE(policy->is_match(&action_runtime_ctx));
+    std::this_thread::sleep_for(std::chrono::milliseconds(50));
+    action_runtime_ctx.resource_ctx->task_controller()->finish();
+    EXPECT_FALSE(policy->is_match(&action_runtime_ctx));
+    action_runtime_ctx.resource_ctx->io_context()->update_scan_rows(101);
+    EXPECT_FALSE(policy->is_match(&action_runtime_ctx));
+    action_runtime_ctx.resource_ctx->io_context()->update_scan_bytes(1001);
+    EXPECT_FALSE(policy->is_match(&action_runtime_ctx));
+    
action_runtime_ctx.resource_ctx->memory_context()->mem_tracker()->consume(10001);
+    EXPECT_TRUE(policy->is_match(&action_runtime_ctx));
+}
+
+TEST_F(WorkloadSchedPolicyTest, test_operator) {
+    // LESS
+    {
+        std::shared_ptr<WorkloadSchedPolicy> policy = 
std::make_shared<WorkloadSchedPolicy>();
+        std::vector<std::unique_ptr<WorkloadCondition>> cond_ptr_list;
+        
cond_ptr_list.push_back(create_workload_condition(TWorkloadMetricType::type::BE_SCAN_ROWS,
+                                                          
TCompareOperator::type::LESS, "100"));
+        std::vector<std::unique_ptr<WorkloadAction>> action_ptr_list;
+        std::set<int64_t> wg_id_set;
+        policy->init(0, "p1", 0, true, 0, wg_id_set, std::move(cond_ptr_list),
+                     std::move(action_ptr_list));
+
+        WorkloadAction::RuntimeContext action_runtime_ctx = 
create_runtime_context();
+        EXPECT_TRUE(policy->is_match(&action_runtime_ctx));
+        action_runtime_ctx.resource_ctx->io_context()->update_scan_rows(100);
+        EXPECT_FALSE(policy->is_match(&action_runtime_ctx));
+    }
+
+    // EQUAL
+    {
+        std::shared_ptr<WorkloadSchedPolicy> policy = 
std::make_shared<WorkloadSchedPolicy>();
+        std::vector<std::unique_ptr<WorkloadCondition>> cond_ptr_list;
+        
cond_ptr_list.push_back(create_workload_condition(TWorkloadMetricType::type::BE_SCAN_ROWS,
+                                                          
TCompareOperator::type::EQUAL, "100"));
+        std::vector<std::unique_ptr<WorkloadAction>> action_ptr_list;
+        std::set<int64_t> wg_id_set;
+        policy->init(0, "p1", 0, true, 0, wg_id_set, std::move(cond_ptr_list),
+                     std::move(action_ptr_list));
+
+        WorkloadAction::RuntimeContext action_runtime_ctx = 
create_runtime_context();
+        EXPECT_FALSE(policy->is_match(&action_runtime_ctx));
+        action_runtime_ctx.resource_ctx->io_context()->update_scan_rows(100);
+        EXPECT_TRUE(policy->is_match(&action_runtime_ctx));
+    }
+}
+
+TEST_F(WorkloadSchedPolicyTest, test_wg_id_set) {
+    std::shared_ptr<WorkloadSchedPolicy> policy = 
std::make_shared<WorkloadSchedPolicy>();
+    std::vector<std::unique_ptr<WorkloadCondition>> cond_ptr_list;
+    
cond_ptr_list.push_back(create_workload_condition(TWorkloadMetricType::type::BE_SCAN_ROWS,
+                                                      
TCompareOperator::type::GREATER_EQUAL, "0"));
+    std::vector<std::unique_ptr<WorkloadAction>> action_ptr_list;
+    std::set<int64_t> wg_id_set;
+    wg_id_set.insert(1);
+    policy->init(0, "p1", 0, true, 0, wg_id_set, std::move(cond_ptr_list),
+                 std::move(action_ptr_list));
+
+    WorkloadAction::RuntimeContext action_runtime_ctx = 
create_runtime_context();
+    EXPECT_FALSE(policy->is_match(&action_runtime_ctx));
+    TWorkloadGroupInfo tworkload_group_info;
+    tworkload_group_info.__isset.id = true;
+    tworkload_group_info.id = 1;
+    tworkload_group_info.__isset.version = true;
+    tworkload_group_info.version = 1;
+    WorkloadGroupInfo workload_group_info =
+            WorkloadGroupInfo::parse_topic_info(tworkload_group_info);
+    auto workload_group = std::make_shared<WorkloadGroup>(workload_group_info);
+    action_runtime_ctx.resource_ctx->set_workload_group(workload_group);
+    EXPECT_TRUE(policy->is_match(&action_runtime_ctx));
+}
+} // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to