This is an automated email from the ASF dual-hosted git repository.
zouxinyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new eb90763a797 [opt](resource) Add `WorkloadSchedPolicyTest` (#48154)
eb90763a797 is described below
commit eb90763a7978bb6ab5f70ec3199174456f4cccb6
Author: Xinyi Zou <[email protected]>
AuthorDate: Fri Feb 21 15:25:10 2025 +0800
[opt](resource) Add `WorkloadSchedPolicyTest` (#48154)
### What problem does this PR solve?
```
[==========] Running 4 tests from 1 test suite.
[----------] Global test environment set-up.
[----------] 4 tests from WorkloadSchedPolicyTest
[ RUN ] WorkloadSchedPolicyTest.one_policy_one_condition
[ OK ] WorkloadSchedPolicyTest.one_policy_one_condition (51 ms)
[ RUN ] WorkloadSchedPolicyTest.one_policy_mutl_condition
[ OK ] WorkloadSchedPolicyTest.one_policy_mutl_condition (50 ms)
[ RUN ] WorkloadSchedPolicyTest.test_operator
[ OK ] WorkloadSchedPolicyTest.test_operator (0 ms)
[ RUN ] WorkloadSchedPolicyTest.test_wg_id_set
[ OK ] WorkloadSchedPolicyTest.test_wg_id_set (0 ms)
[----------] 4 tests from WorkloadSchedPolicyTest (103 ms total)
[----------] Global test environment tear-down
[==========] 4 tests from 1 test suite ran. (104 ms total)
[ PASSED ] 4 tests.
=== Finished. Gtest output:
/mnt/disk2/zouxinyi/doris/core/be/ut_build_ASAN/gtest_output
```
---
.../runtime/workload_management/memory_context.h | 3 +-
.../runtime/workload_management/task_controller.h | 10 +-
.../workload_management/workload_sched_policy.cpp | 25 +-
be/test/runtime/workload_sched_policy_test.cpp | 286 +++++++++++++++++++++
4 files changed, 306 insertions(+), 18 deletions(-)
diff --git a/be/src/runtime/workload_management/memory_context.h
b/be/src/runtime/workload_management/memory_context.h
index 6caff41f026..e79f1375592 100644
--- a/be/src/runtime/workload_management/memory_context.h
+++ b/be/src/runtime/workload_management/memory_context.h
@@ -83,7 +83,8 @@ public:
int64_t current_memory_bytes() const { return mem_tracker_->consumption();
}
int64_t peak_memory_bytes() const { return
mem_tracker_->peak_consumption(); }
- int64_t max_peak_memory_bytes() const { return
stats_.max_peak_memory_bytes_counter_->value(); }
+ // TODO, use stats_.max_peak_memory_bytes_counter_->value();
+ int64_t max_peak_memory_bytes() const { return
mem_tracker_->peak_consumption(); }
int64_t revoke_attempts() const { return
stats_.revoke_attempts_counter_->value(); }
int64_t revoke_wait_time_ms() const { return
stats_.revoke_wait_time_ms_counter_->value(); }
int64_t revoked_bytes() const { return
stats_.revoked_bytes_counter_->value(); }
diff --git a/be/src/runtime/workload_management/task_controller.h
b/be/src/runtime/workload_management/task_controller.h
index f88d7d50832..9fc86b142a0 100644
--- a/be/src/runtime/workload_management/task_controller.h
+++ b/be/src/runtime/workload_management/task_controller.h
@@ -30,7 +30,10 @@ class TaskController {
ENABLE_FACTORY_CREATOR(TaskController);
public:
- TaskController() { task_id_ = TUniqueId(); };
+ TaskController() {
+ task_id_ = TUniqueId();
+ start_time_ = MonotonicMillis();
+ };
virtual ~TaskController() = default;
bool is_attach_task() { return task_id_ != TUniqueId(); }
@@ -51,10 +54,7 @@ public:
int64_t start_time() const { return start_time_; }
int64_t finish_time() const { return finish_time_; }
- Status running_time(int64_t* running_time_msecs) const {
- *running_time_msecs = finish_time_ - start_time_;
- return Status::OK();
- }
+ int64_t running_time() const { return finish_time_ - start_time_; }
TNetworkAddress fe_addr() { return fe_addr_; }
TQueryType::type query_type() { return query_type_; }
diff --git a/be/src/runtime/workload_management/workload_sched_policy.cpp
b/be/src/runtime/workload_management/workload_sched_policy.cpp
index 597daafa6bc..91d75e2aae1 100644
--- a/be/src/runtime/workload_management/workload_sched_policy.cpp
+++ b/be/src/runtime/workload_management/workload_sched_policy.cpp
@@ -35,16 +35,18 @@ void WorkloadSchedPolicy::init(int64_t id, std::string
name, int version, bool e
_action_list = std::move(action_list);
_wg_id_set = wg_id_set;
- _first_action_type = _action_list[0]->get_action_type();
- if (_first_action_type != WorkloadActionType::MOVE_QUERY_TO_GROUP &&
- _first_action_type != WorkloadActionType::CANCEL_QUERY) {
- for (int i = 1; i < _action_list.size(); i++) {
- WorkloadActionType cur_action_type =
_action_list[i]->get_action_type();
- // one policy can not both contains move and cancel
- if (cur_action_type == WorkloadActionType::MOVE_QUERY_TO_GROUP ||
- cur_action_type == WorkloadActionType::CANCEL_QUERY) {
- _first_action_type = cur_action_type;
- break;
+ if (!_action_list.empty()) {
+ _first_action_type = _action_list[0]->get_action_type();
+ if (_first_action_type != WorkloadActionType::MOVE_QUERY_TO_GROUP &&
+ _first_action_type != WorkloadActionType::CANCEL_QUERY) {
+ for (int i = 1; i < _action_list.size(); i++) {
+ WorkloadActionType cur_action_type =
_action_list[i]->get_action_type();
+ // one policy can not both contains move and cancel
+ if (cur_action_type == WorkloadActionType::MOVE_QUERY_TO_GROUP
||
+ cur_action_type == WorkloadActionType::CANCEL_QUERY) {
+ _first_action_type = cur_action_type;
+ break;
+ }
}
}
}
@@ -70,8 +72,7 @@ bool
WorkloadSchedPolicy::is_match(WorkloadAction::RuntimeContext* action_runtim
switch (cond->get_workload_metric_type()) {
case WorkloadMetricType::QUERY_TIME: {
val = std::to_string(
- MonotonicMillis() -
-
action_runtime_ctx->resource_ctx->task_controller()->finish_time());
+
action_runtime_ctx->resource_ctx->task_controller()->running_time());
break;
}
case WorkloadMetricType::SCAN_BYTES: {
diff --git a/be/test/runtime/workload_sched_policy_test.cpp
b/be/test/runtime/workload_sched_policy_test.cpp
new file mode 100644
index 00000000000..e7cfea03695
--- /dev/null
+++ b/be/test/runtime/workload_sched_policy_test.cpp
@@ -0,0 +1,286 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/workload_management/workload_sched_policy.h"
+
+#include <gen_cpp/BackendService_types.h>
+#include <gtest/gtest.h>
+
+#include "pipeline/task_scheduler.h"
+#include "runtime/workload_group/workload_group.h"
+#include "runtime/workload_management/resource_context.h"
+#include "runtime/workload_management/workload_condition.h"
+#include "util/threadpool.h"
+#include "vec/exec/scan/scanner_scheduler.h"
+
+namespace doris {
+
+class WorkloadSchedPolicyTest : public testing::Test {
+public:
+ WorkloadSchedPolicyTest() = default;
+ ~WorkloadSchedPolicyTest() override = default;
+
+protected:
+ std::unique_ptr<WorkloadCondition>
create_workload_condition(TWorkloadMetricType::type mtype,
+
TCompareOperator::type otype,
+ std::string
value) {
+ TWorkloadCondition cond;
+ cond.metric_name = mtype;
+ cond.op = otype;
+ cond.value = value;
+ return WorkloadConditionFactory::create_workload_condition(&cond);
+ }
+
+ std::unique_ptr<WorkloadAction>
create_workload_action(TWorkloadActionType::type atype) {
+ TWorkloadAction action;
+ action.action = atype;
+ return WorkloadActionFactory::create_workload_action(&action);
+ }
+
+ WorkloadAction::RuntimeContext create_runtime_context() {
+ std::shared_ptr<ResourceContext> resource_ctx =
ResourceContext::create_shared();
+ WorkloadAction::RuntimeContext action_runtime_ctx(resource_ctx);
+ return action_runtime_ctx;
+ }
+
+ void SetUp() override {}
+};
+
+TEST_F(WorkloadSchedPolicyTest, one_policy_one_condition) {
+ // 1 empty resource
+ {
+ std::shared_ptr<WorkloadSchedPolicy> policy =
std::make_shared<WorkloadSchedPolicy>();
+ std::vector<std::unique_ptr<WorkloadCondition>> cond_ptr_list;
+ std::vector<std::unique_ptr<WorkloadAction>> action_ptr_list;
+ std::set<int64_t> wg_id_set;
+ policy->init(0, "p1", 0, true, 0, wg_id_set, std::move(cond_ptr_list),
+ std::move(action_ptr_list));
+
+ WorkloadAction::RuntimeContext action_runtime_ctx =
create_runtime_context();
+ EXPECT_TRUE(policy->is_match(&action_runtime_ctx));
+ }
+ {
+ std::shared_ptr<WorkloadSchedPolicy> policy =
std::make_shared<WorkloadSchedPolicy>();
+ std::vector<std::unique_ptr<WorkloadCondition>> cond_ptr_list;
+
cond_ptr_list.push_back(create_workload_condition(TWorkloadMetricType::type::QUERY_TIME,
+
TCompareOperator::type::GREATER, "10"));
+ std::vector<std::unique_ptr<WorkloadAction>> action_ptr_list;
+
action_ptr_list.push_back(create_workload_action(TWorkloadActionType::type::CANCEL_QUERY));
+ std::set<int64_t> wg_id_set;
+ policy->init(0, "p1", 0, true, 0, wg_id_set, std::move(cond_ptr_list),
+ std::move(action_ptr_list));
+
+ WorkloadAction::RuntimeContext action_runtime_ctx =
create_runtime_context();
+ EXPECT_FALSE(policy->is_match(&action_runtime_ctx));
+ }
+
+ // 2 check query time
+ {
+ std::shared_ptr<WorkloadSchedPolicy> policy =
std::make_shared<WorkloadSchedPolicy>();
+ std::vector<std::unique_ptr<WorkloadCondition>> cond_ptr_list;
+
cond_ptr_list.push_back(create_workload_condition(TWorkloadMetricType::type::QUERY_TIME,
+
TCompareOperator::type::GREATER, "10"));
+ std::vector<std::unique_ptr<WorkloadAction>> action_ptr_list;
+
action_ptr_list.push_back(create_workload_action(TWorkloadActionType::type::CANCEL_QUERY));
+ std::set<int64_t> wg_id_set;
+ policy->init(0, "p1", 0, true, 0, wg_id_set, std::move(cond_ptr_list),
+ std::move(action_ptr_list));
+
+ WorkloadAction::RuntimeContext action_runtime_ctx =
create_runtime_context();
+ std::this_thread::sleep_for(std::chrono::milliseconds(50));
+ action_runtime_ctx.resource_ctx->task_controller()->finish();
+ EXPECT_TRUE(policy->is_match(&action_runtime_ctx))
+ << ": " <<
action_runtime_ctx.resource_ctx->task_controller()->running_time();
+ }
+
+ // 3 check scan rows
+ {
+ std::shared_ptr<WorkloadSchedPolicy> policy =
std::make_shared<WorkloadSchedPolicy>();
+ std::vector<std::unique_ptr<WorkloadCondition>> cond_ptr_list;
+
cond_ptr_list.push_back(create_workload_condition(TWorkloadMetricType::type::BE_SCAN_ROWS,
+
TCompareOperator::type::GREATER, "100"));
+ std::vector<std::unique_ptr<WorkloadAction>> action_ptr_list;
+
action_ptr_list.push_back(create_workload_action(TWorkloadActionType::type::CANCEL_QUERY));
+ std::set<int64_t> wg_id_set;
+ policy->init(0, "p1", 0, true, 0, wg_id_set, std::move(cond_ptr_list),
+ std::move(action_ptr_list));
+
+ WorkloadAction::RuntimeContext action_runtime_ctx =
create_runtime_context();
+ action_runtime_ctx.resource_ctx->io_context()->update_scan_rows(99);
+ EXPECT_FALSE(policy->is_match(&action_runtime_ctx))
+ << ": " <<
action_runtime_ctx.resource_ctx->io_context()->scan_rows();
+ action_runtime_ctx.resource_ctx->io_context()->update_scan_rows(2);
+ EXPECT_TRUE(policy->is_match(&action_runtime_ctx))
+ << ": " <<
action_runtime_ctx.resource_ctx->io_context()->scan_rows();
+ action_runtime_ctx.resource_ctx->io_context()->update_scan_rows(-2);
+ EXPECT_FALSE(policy->is_match(&action_runtime_ctx))
+ << ": " <<
action_runtime_ctx.resource_ctx->io_context()->scan_rows();
+ }
+
+ // 4 check scan bytes
+ {
+ std::shared_ptr<WorkloadSchedPolicy> policy =
std::make_shared<WorkloadSchedPolicy>();
+ std::vector<std::unique_ptr<WorkloadCondition>> cond_ptr_list;
+
cond_ptr_list.push_back(create_workload_condition(TWorkloadMetricType::type::BE_SCAN_BYTES,
+
TCompareOperator::type::GREATER, "1000"));
+ std::vector<std::unique_ptr<WorkloadAction>> action_ptr_list;
+
action_ptr_list.push_back(create_workload_action(TWorkloadActionType::type::CANCEL_QUERY));
+ std::set<int64_t> wg_id_set;
+ policy->init(0, "p1", 0, true, 0, wg_id_set, std::move(cond_ptr_list),
+ std::move(action_ptr_list));
+
+ WorkloadAction::RuntimeContext action_runtime_ctx =
create_runtime_context();
+ action_runtime_ctx.resource_ctx->io_context()->update_scan_bytes(999);
+ EXPECT_FALSE(policy->is_match(&action_runtime_ctx))
+ << ": " <<
action_runtime_ctx.resource_ctx->io_context()->scan_bytes();
+ action_runtime_ctx.resource_ctx->io_context()->update_scan_bytes(2);
+ EXPECT_TRUE(policy->is_match(&action_runtime_ctx))
+ << ": " <<
action_runtime_ctx.resource_ctx->io_context()->scan_bytes();
+ action_runtime_ctx.resource_ctx->io_context()->update_scan_bytes(-2);
+ EXPECT_FALSE(policy->is_match(&action_runtime_ctx))
+ << ": " <<
action_runtime_ctx.resource_ctx->io_context()->scan_bytes();
+ }
+
+ // 5 check query be memory bytes
+ {
+ std::shared_ptr<WorkloadSchedPolicy> policy =
std::make_shared<WorkloadSchedPolicy>();
+ std::vector<std::unique_ptr<WorkloadCondition>> cond_ptr_list;
+ cond_ptr_list.push_back(
+
create_workload_condition(TWorkloadMetricType::type::QUERY_BE_MEMORY_BYTES,
+ TCompareOperator::type::GREATER,
"10000"));
+ std::vector<std::unique_ptr<WorkloadAction>> action_ptr_list;
+
action_ptr_list.push_back(create_workload_action(TWorkloadActionType::type::CANCEL_QUERY));
+ std::set<int64_t> wg_id_set;
+ policy->init(0, "p1", 0, true, 0, wg_id_set, std::move(cond_ptr_list),
+ std::move(action_ptr_list));
+
+ WorkloadAction::RuntimeContext action_runtime_ctx =
create_runtime_context();
+ std::shared_ptr<MemTrackerLimiter> mem_tracker =
+
MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::QUERY, "Test");
+
action_runtime_ctx.resource_ctx->memory_context()->set_mem_tracker(mem_tracker);
+
action_runtime_ctx.resource_ctx->memory_context()->mem_tracker()->consume(9999);
+ EXPECT_FALSE(policy->is_match(&action_runtime_ctx))
+ << ": "
+ <<
action_runtime_ctx.resource_ctx->memory_context()->current_memory_bytes();
+
action_runtime_ctx.resource_ctx->memory_context()->mem_tracker()->consume(2);
+ EXPECT_TRUE(policy->is_match(&action_runtime_ctx))
+ << ": "
+ <<
action_runtime_ctx.resource_ctx->memory_context()->current_memory_bytes();
+
action_runtime_ctx.resource_ctx->memory_context()->mem_tracker()->consume(-2);
+ EXPECT_FALSE(policy->is_match(&action_runtime_ctx))
+ << ": "
+ <<
action_runtime_ctx.resource_ctx->memory_context()->current_memory_bytes();
+ }
+}
+
+TEST_F(WorkloadSchedPolicyTest, one_policy_mutl_condition) {
+ // 1. init policy
+ std::shared_ptr<WorkloadSchedPolicy> policy =
std::make_shared<WorkloadSchedPolicy>();
+ std::vector<std::unique_ptr<WorkloadCondition>> cond_ptr_list;
+
cond_ptr_list.push_back(create_workload_condition(TWorkloadMetricType::type::QUERY_TIME,
+
TCompareOperator::type::GREATER, "10"));
+
cond_ptr_list.push_back(create_workload_condition(TWorkloadMetricType::type::BE_SCAN_ROWS,
+
TCompareOperator::type::GREATER, "100"));
+
cond_ptr_list.push_back(create_workload_condition(TWorkloadMetricType::type::BE_SCAN_BYTES,
+
TCompareOperator::type::GREATER, "1000"));
+ cond_ptr_list.push_back(
+
create_workload_condition(TWorkloadMetricType::type::QUERY_BE_MEMORY_BYTES,
+ TCompareOperator::type::GREATER,
"10000"));
+ std::vector<std::unique_ptr<WorkloadAction>> action_ptr_list;
+
action_ptr_list.push_back(create_workload_action(TWorkloadActionType::type::CANCEL_QUERY));
+ std::set<int64_t> wg_id_set;
+ policy->init(0, "p1", 0, true, 0, wg_id_set, std::move(cond_ptr_list),
+ std::move(action_ptr_list));
+
+ // 2. is match
+ WorkloadAction::RuntimeContext action_runtime_ctx =
create_runtime_context();
+ std::shared_ptr<MemTrackerLimiter> mem_tracker =
+ MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::QUERY,
"Test");
+
action_runtime_ctx.resource_ctx->memory_context()->set_mem_tracker(mem_tracker);
+ EXPECT_FALSE(policy->is_match(&action_runtime_ctx));
+ std::this_thread::sleep_for(std::chrono::milliseconds(50));
+ action_runtime_ctx.resource_ctx->task_controller()->finish();
+ EXPECT_FALSE(policy->is_match(&action_runtime_ctx));
+ action_runtime_ctx.resource_ctx->io_context()->update_scan_rows(101);
+ EXPECT_FALSE(policy->is_match(&action_runtime_ctx));
+ action_runtime_ctx.resource_ctx->io_context()->update_scan_bytes(1001);
+ EXPECT_FALSE(policy->is_match(&action_runtime_ctx));
+
action_runtime_ctx.resource_ctx->memory_context()->mem_tracker()->consume(10001);
+ EXPECT_TRUE(policy->is_match(&action_runtime_ctx));
+}
+
+TEST_F(WorkloadSchedPolicyTest, test_operator) {
+ // LESS
+ {
+ std::shared_ptr<WorkloadSchedPolicy> policy =
std::make_shared<WorkloadSchedPolicy>();
+ std::vector<std::unique_ptr<WorkloadCondition>> cond_ptr_list;
+
cond_ptr_list.push_back(create_workload_condition(TWorkloadMetricType::type::BE_SCAN_ROWS,
+
TCompareOperator::type::LESS, "100"));
+ std::vector<std::unique_ptr<WorkloadAction>> action_ptr_list;
+ std::set<int64_t> wg_id_set;
+ policy->init(0, "p1", 0, true, 0, wg_id_set, std::move(cond_ptr_list),
+ std::move(action_ptr_list));
+
+ WorkloadAction::RuntimeContext action_runtime_ctx =
create_runtime_context();
+ EXPECT_TRUE(policy->is_match(&action_runtime_ctx));
+ action_runtime_ctx.resource_ctx->io_context()->update_scan_rows(100);
+ EXPECT_FALSE(policy->is_match(&action_runtime_ctx));
+ }
+
+ // EQUAL
+ {
+ std::shared_ptr<WorkloadSchedPolicy> policy =
std::make_shared<WorkloadSchedPolicy>();
+ std::vector<std::unique_ptr<WorkloadCondition>> cond_ptr_list;
+
cond_ptr_list.push_back(create_workload_condition(TWorkloadMetricType::type::BE_SCAN_ROWS,
+
TCompareOperator::type::EQUAL, "100"));
+ std::vector<std::unique_ptr<WorkloadAction>> action_ptr_list;
+ std::set<int64_t> wg_id_set;
+ policy->init(0, "p1", 0, true, 0, wg_id_set, std::move(cond_ptr_list),
+ std::move(action_ptr_list));
+
+ WorkloadAction::RuntimeContext action_runtime_ctx =
create_runtime_context();
+ EXPECT_FALSE(policy->is_match(&action_runtime_ctx));
+ action_runtime_ctx.resource_ctx->io_context()->update_scan_rows(100);
+ EXPECT_TRUE(policy->is_match(&action_runtime_ctx));
+ }
+}
+
+TEST_F(WorkloadSchedPolicyTest, test_wg_id_set) {
+ std::shared_ptr<WorkloadSchedPolicy> policy =
std::make_shared<WorkloadSchedPolicy>();
+ std::vector<std::unique_ptr<WorkloadCondition>> cond_ptr_list;
+
cond_ptr_list.push_back(create_workload_condition(TWorkloadMetricType::type::BE_SCAN_ROWS,
+
TCompareOperator::type::GREATER_EQUAL, "0"));
+ std::vector<std::unique_ptr<WorkloadAction>> action_ptr_list;
+ std::set<int64_t> wg_id_set;
+ wg_id_set.insert(1);
+ policy->init(0, "p1", 0, true, 0, wg_id_set, std::move(cond_ptr_list),
+ std::move(action_ptr_list));
+
+ WorkloadAction::RuntimeContext action_runtime_ctx =
create_runtime_context();
+ EXPECT_FALSE(policy->is_match(&action_runtime_ctx));
+ TWorkloadGroupInfo tworkload_group_info;
+ tworkload_group_info.__isset.id = true;
+ tworkload_group_info.id = 1;
+ tworkload_group_info.__isset.version = true;
+ tworkload_group_info.version = 1;
+ WorkloadGroupInfo workload_group_info =
+ WorkloadGroupInfo::parse_topic_info(tworkload_group_info);
+ auto workload_group = std::make_shared<WorkloadGroup>(workload_group_info);
+ action_runtime_ctx.resource_ctx->set_workload_group(workload_group);
+ EXPECT_TRUE(policy->is_match(&action_runtime_ctx));
+}
+} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]