This is an automated email from the ASF dual-hosted git repository. zouxinyi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new eb90763a797 [opt](resource) Add `WorkloadSchedPolicyTest` (#48154) eb90763a797 is described below commit eb90763a7978bb6ab5f70ec3199174456f4cccb6 Author: Xinyi Zou <zouxi...@selectdb.com> AuthorDate: Fri Feb 21 15:25:10 2025 +0800 [opt](resource) Add `WorkloadSchedPolicyTest` (#48154) ### What problem does this PR solve? ``` [==========] Running 4 tests from 1 test suite. [----------] Global test environment set-up. [----------] 4 tests from WorkloadSchedPolicyTest [ RUN ] WorkloadSchedPolicyTest.one_policy_one_condition [ OK ] WorkloadSchedPolicyTest.one_policy_one_condition (51 ms) [ RUN ] WorkloadSchedPolicyTest.one_policy_mutl_condition [ OK ] WorkloadSchedPolicyTest.one_policy_mutl_condition (50 ms) [ RUN ] WorkloadSchedPolicyTest.test_operator [ OK ] WorkloadSchedPolicyTest.test_operator (0 ms) [ RUN ] WorkloadSchedPolicyTest.test_wg_id_set [ OK ] WorkloadSchedPolicyTest.test_wg_id_set (0 ms) [----------] 4 tests from WorkloadSchedPolicyTest (103 ms total) [----------] Global test environment tear-down [==========] 4 tests from 1 test suite ran. (104 ms total) [ PASSED ] 4 tests. === Finished. Gtest output: /mnt/disk2/zouxinyi/doris/core/be/ut_build_ASAN/gtest_output ``` --- .../runtime/workload_management/memory_context.h | 3 +- .../runtime/workload_management/task_controller.h | 10 +- .../workload_management/workload_sched_policy.cpp | 25 +- be/test/runtime/workload_sched_policy_test.cpp | 286 +++++++++++++++++++++ 4 files changed, 306 insertions(+), 18 deletions(-) diff --git a/be/src/runtime/workload_management/memory_context.h b/be/src/runtime/workload_management/memory_context.h index 6caff41f026..e79f1375592 100644 --- a/be/src/runtime/workload_management/memory_context.h +++ b/be/src/runtime/workload_management/memory_context.h @@ -83,7 +83,8 @@ public: int64_t current_memory_bytes() const { return mem_tracker_->consumption(); } int64_t peak_memory_bytes() const { return mem_tracker_->peak_consumption(); } - int64_t max_peak_memory_bytes() const { return stats_.max_peak_memory_bytes_counter_->value(); } + // TODO, use stats_.max_peak_memory_bytes_counter_->value(); + int64_t max_peak_memory_bytes() const { return mem_tracker_->peak_consumption(); } int64_t revoke_attempts() const { return stats_.revoke_attempts_counter_->value(); } int64_t revoke_wait_time_ms() const { return stats_.revoke_wait_time_ms_counter_->value(); } int64_t revoked_bytes() const { return stats_.revoked_bytes_counter_->value(); } diff --git a/be/src/runtime/workload_management/task_controller.h b/be/src/runtime/workload_management/task_controller.h index f88d7d50832..9fc86b142a0 100644 --- a/be/src/runtime/workload_management/task_controller.h +++ b/be/src/runtime/workload_management/task_controller.h @@ -30,7 +30,10 @@ class TaskController { ENABLE_FACTORY_CREATOR(TaskController); public: - TaskController() { task_id_ = TUniqueId(); }; + TaskController() { + task_id_ = TUniqueId(); + start_time_ = MonotonicMillis(); + }; virtual ~TaskController() = default; bool is_attach_task() { return task_id_ != TUniqueId(); } @@ -51,10 +54,7 @@ public: int64_t start_time() const { return start_time_; } int64_t finish_time() const { return finish_time_; } - Status running_time(int64_t* running_time_msecs) const { - *running_time_msecs = finish_time_ - start_time_; - return Status::OK(); - } + int64_t running_time() const { return finish_time_ - start_time_; } TNetworkAddress fe_addr() { return fe_addr_; } TQueryType::type query_type() { return query_type_; } diff --git a/be/src/runtime/workload_management/workload_sched_policy.cpp b/be/src/runtime/workload_management/workload_sched_policy.cpp index 597daafa6bc..91d75e2aae1 100644 --- a/be/src/runtime/workload_management/workload_sched_policy.cpp +++ b/be/src/runtime/workload_management/workload_sched_policy.cpp @@ -35,16 +35,18 @@ void WorkloadSchedPolicy::init(int64_t id, std::string name, int version, bool e _action_list = std::move(action_list); _wg_id_set = wg_id_set; - _first_action_type = _action_list[0]->get_action_type(); - if (_first_action_type != WorkloadActionType::MOVE_QUERY_TO_GROUP && - _first_action_type != WorkloadActionType::CANCEL_QUERY) { - for (int i = 1; i < _action_list.size(); i++) { - WorkloadActionType cur_action_type = _action_list[i]->get_action_type(); - // one policy can not both contains move and cancel - if (cur_action_type == WorkloadActionType::MOVE_QUERY_TO_GROUP || - cur_action_type == WorkloadActionType::CANCEL_QUERY) { - _first_action_type = cur_action_type; - break; + if (!_action_list.empty()) { + _first_action_type = _action_list[0]->get_action_type(); + if (_first_action_type != WorkloadActionType::MOVE_QUERY_TO_GROUP && + _first_action_type != WorkloadActionType::CANCEL_QUERY) { + for (int i = 1; i < _action_list.size(); i++) { + WorkloadActionType cur_action_type = _action_list[i]->get_action_type(); + // one policy can not both contains move and cancel + if (cur_action_type == WorkloadActionType::MOVE_QUERY_TO_GROUP || + cur_action_type == WorkloadActionType::CANCEL_QUERY) { + _first_action_type = cur_action_type; + break; + } } } } @@ -70,8 +72,7 @@ bool WorkloadSchedPolicy::is_match(WorkloadAction::RuntimeContext* action_runtim switch (cond->get_workload_metric_type()) { case WorkloadMetricType::QUERY_TIME: { val = std::to_string( - MonotonicMillis() - - action_runtime_ctx->resource_ctx->task_controller()->finish_time()); + action_runtime_ctx->resource_ctx->task_controller()->running_time()); break; } case WorkloadMetricType::SCAN_BYTES: { diff --git a/be/test/runtime/workload_sched_policy_test.cpp b/be/test/runtime/workload_sched_policy_test.cpp new file mode 100644 index 00000000000..e7cfea03695 --- /dev/null +++ b/be/test/runtime/workload_sched_policy_test.cpp @@ -0,0 +1,286 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/workload_management/workload_sched_policy.h" + +#include <gen_cpp/BackendService_types.h> +#include <gtest/gtest.h> + +#include "pipeline/task_scheduler.h" +#include "runtime/workload_group/workload_group.h" +#include "runtime/workload_management/resource_context.h" +#include "runtime/workload_management/workload_condition.h" +#include "util/threadpool.h" +#include "vec/exec/scan/scanner_scheduler.h" + +namespace doris { + +class WorkloadSchedPolicyTest : public testing::Test { +public: + WorkloadSchedPolicyTest() = default; + ~WorkloadSchedPolicyTest() override = default; + +protected: + std::unique_ptr<WorkloadCondition> create_workload_condition(TWorkloadMetricType::type mtype, + TCompareOperator::type otype, + std::string value) { + TWorkloadCondition cond; + cond.metric_name = mtype; + cond.op = otype; + cond.value = value; + return WorkloadConditionFactory::create_workload_condition(&cond); + } + + std::unique_ptr<WorkloadAction> create_workload_action(TWorkloadActionType::type atype) { + TWorkloadAction action; + action.action = atype; + return WorkloadActionFactory::create_workload_action(&action); + } + + WorkloadAction::RuntimeContext create_runtime_context() { + std::shared_ptr<ResourceContext> resource_ctx = ResourceContext::create_shared(); + WorkloadAction::RuntimeContext action_runtime_ctx(resource_ctx); + return action_runtime_ctx; + } + + void SetUp() override {} +}; + +TEST_F(WorkloadSchedPolicyTest, one_policy_one_condition) { + // 1 empty resource + { + std::shared_ptr<WorkloadSchedPolicy> policy = std::make_shared<WorkloadSchedPolicy>(); + std::vector<std::unique_ptr<WorkloadCondition>> cond_ptr_list; + std::vector<std::unique_ptr<WorkloadAction>> action_ptr_list; + std::set<int64_t> wg_id_set; + policy->init(0, "p1", 0, true, 0, wg_id_set, std::move(cond_ptr_list), + std::move(action_ptr_list)); + + WorkloadAction::RuntimeContext action_runtime_ctx = create_runtime_context(); + EXPECT_TRUE(policy->is_match(&action_runtime_ctx)); + } + { + std::shared_ptr<WorkloadSchedPolicy> policy = std::make_shared<WorkloadSchedPolicy>(); + std::vector<std::unique_ptr<WorkloadCondition>> cond_ptr_list; + cond_ptr_list.push_back(create_workload_condition(TWorkloadMetricType::type::QUERY_TIME, + TCompareOperator::type::GREATER, "10")); + std::vector<std::unique_ptr<WorkloadAction>> action_ptr_list; + action_ptr_list.push_back(create_workload_action(TWorkloadActionType::type::CANCEL_QUERY)); + std::set<int64_t> wg_id_set; + policy->init(0, "p1", 0, true, 0, wg_id_set, std::move(cond_ptr_list), + std::move(action_ptr_list)); + + WorkloadAction::RuntimeContext action_runtime_ctx = create_runtime_context(); + EXPECT_FALSE(policy->is_match(&action_runtime_ctx)); + } + + // 2 check query time + { + std::shared_ptr<WorkloadSchedPolicy> policy = std::make_shared<WorkloadSchedPolicy>(); + std::vector<std::unique_ptr<WorkloadCondition>> cond_ptr_list; + cond_ptr_list.push_back(create_workload_condition(TWorkloadMetricType::type::QUERY_TIME, + TCompareOperator::type::GREATER, "10")); + std::vector<std::unique_ptr<WorkloadAction>> action_ptr_list; + action_ptr_list.push_back(create_workload_action(TWorkloadActionType::type::CANCEL_QUERY)); + std::set<int64_t> wg_id_set; + policy->init(0, "p1", 0, true, 0, wg_id_set, std::move(cond_ptr_list), + std::move(action_ptr_list)); + + WorkloadAction::RuntimeContext action_runtime_ctx = create_runtime_context(); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + action_runtime_ctx.resource_ctx->task_controller()->finish(); + EXPECT_TRUE(policy->is_match(&action_runtime_ctx)) + << ": " << action_runtime_ctx.resource_ctx->task_controller()->running_time(); + } + + // 3 check scan rows + { + std::shared_ptr<WorkloadSchedPolicy> policy = std::make_shared<WorkloadSchedPolicy>(); + std::vector<std::unique_ptr<WorkloadCondition>> cond_ptr_list; + cond_ptr_list.push_back(create_workload_condition(TWorkloadMetricType::type::BE_SCAN_ROWS, + TCompareOperator::type::GREATER, "100")); + std::vector<std::unique_ptr<WorkloadAction>> action_ptr_list; + action_ptr_list.push_back(create_workload_action(TWorkloadActionType::type::CANCEL_QUERY)); + std::set<int64_t> wg_id_set; + policy->init(0, "p1", 0, true, 0, wg_id_set, std::move(cond_ptr_list), + std::move(action_ptr_list)); + + WorkloadAction::RuntimeContext action_runtime_ctx = create_runtime_context(); + action_runtime_ctx.resource_ctx->io_context()->update_scan_rows(99); + EXPECT_FALSE(policy->is_match(&action_runtime_ctx)) + << ": " << action_runtime_ctx.resource_ctx->io_context()->scan_rows(); + action_runtime_ctx.resource_ctx->io_context()->update_scan_rows(2); + EXPECT_TRUE(policy->is_match(&action_runtime_ctx)) + << ": " << action_runtime_ctx.resource_ctx->io_context()->scan_rows(); + action_runtime_ctx.resource_ctx->io_context()->update_scan_rows(-2); + EXPECT_FALSE(policy->is_match(&action_runtime_ctx)) + << ": " << action_runtime_ctx.resource_ctx->io_context()->scan_rows(); + } + + // 4 check scan bytes + { + std::shared_ptr<WorkloadSchedPolicy> policy = std::make_shared<WorkloadSchedPolicy>(); + std::vector<std::unique_ptr<WorkloadCondition>> cond_ptr_list; + cond_ptr_list.push_back(create_workload_condition(TWorkloadMetricType::type::BE_SCAN_BYTES, + TCompareOperator::type::GREATER, "1000")); + std::vector<std::unique_ptr<WorkloadAction>> action_ptr_list; + action_ptr_list.push_back(create_workload_action(TWorkloadActionType::type::CANCEL_QUERY)); + std::set<int64_t> wg_id_set; + policy->init(0, "p1", 0, true, 0, wg_id_set, std::move(cond_ptr_list), + std::move(action_ptr_list)); + + WorkloadAction::RuntimeContext action_runtime_ctx = create_runtime_context(); + action_runtime_ctx.resource_ctx->io_context()->update_scan_bytes(999); + EXPECT_FALSE(policy->is_match(&action_runtime_ctx)) + << ": " << action_runtime_ctx.resource_ctx->io_context()->scan_bytes(); + action_runtime_ctx.resource_ctx->io_context()->update_scan_bytes(2); + EXPECT_TRUE(policy->is_match(&action_runtime_ctx)) + << ": " << action_runtime_ctx.resource_ctx->io_context()->scan_bytes(); + action_runtime_ctx.resource_ctx->io_context()->update_scan_bytes(-2); + EXPECT_FALSE(policy->is_match(&action_runtime_ctx)) + << ": " << action_runtime_ctx.resource_ctx->io_context()->scan_bytes(); + } + + // 5 check query be memory bytes + { + std::shared_ptr<WorkloadSchedPolicy> policy = std::make_shared<WorkloadSchedPolicy>(); + std::vector<std::unique_ptr<WorkloadCondition>> cond_ptr_list; + cond_ptr_list.push_back( + create_workload_condition(TWorkloadMetricType::type::QUERY_BE_MEMORY_BYTES, + TCompareOperator::type::GREATER, "10000")); + std::vector<std::unique_ptr<WorkloadAction>> action_ptr_list; + action_ptr_list.push_back(create_workload_action(TWorkloadActionType::type::CANCEL_QUERY)); + std::set<int64_t> wg_id_set; + policy->init(0, "p1", 0, true, 0, wg_id_set, std::move(cond_ptr_list), + std::move(action_ptr_list)); + + WorkloadAction::RuntimeContext action_runtime_ctx = create_runtime_context(); + std::shared_ptr<MemTrackerLimiter> mem_tracker = + MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::QUERY, "Test"); + action_runtime_ctx.resource_ctx->memory_context()->set_mem_tracker(mem_tracker); + action_runtime_ctx.resource_ctx->memory_context()->mem_tracker()->consume(9999); + EXPECT_FALSE(policy->is_match(&action_runtime_ctx)) + << ": " + << action_runtime_ctx.resource_ctx->memory_context()->current_memory_bytes(); + action_runtime_ctx.resource_ctx->memory_context()->mem_tracker()->consume(2); + EXPECT_TRUE(policy->is_match(&action_runtime_ctx)) + << ": " + << action_runtime_ctx.resource_ctx->memory_context()->current_memory_bytes(); + action_runtime_ctx.resource_ctx->memory_context()->mem_tracker()->consume(-2); + EXPECT_FALSE(policy->is_match(&action_runtime_ctx)) + << ": " + << action_runtime_ctx.resource_ctx->memory_context()->current_memory_bytes(); + } +} + +TEST_F(WorkloadSchedPolicyTest, one_policy_mutl_condition) { + // 1. init policy + std::shared_ptr<WorkloadSchedPolicy> policy = std::make_shared<WorkloadSchedPolicy>(); + std::vector<std::unique_ptr<WorkloadCondition>> cond_ptr_list; + cond_ptr_list.push_back(create_workload_condition(TWorkloadMetricType::type::QUERY_TIME, + TCompareOperator::type::GREATER, "10")); + cond_ptr_list.push_back(create_workload_condition(TWorkloadMetricType::type::BE_SCAN_ROWS, + TCompareOperator::type::GREATER, "100")); + cond_ptr_list.push_back(create_workload_condition(TWorkloadMetricType::type::BE_SCAN_BYTES, + TCompareOperator::type::GREATER, "1000")); + cond_ptr_list.push_back( + create_workload_condition(TWorkloadMetricType::type::QUERY_BE_MEMORY_BYTES, + TCompareOperator::type::GREATER, "10000")); + std::vector<std::unique_ptr<WorkloadAction>> action_ptr_list; + action_ptr_list.push_back(create_workload_action(TWorkloadActionType::type::CANCEL_QUERY)); + std::set<int64_t> wg_id_set; + policy->init(0, "p1", 0, true, 0, wg_id_set, std::move(cond_ptr_list), + std::move(action_ptr_list)); + + // 2. is match + WorkloadAction::RuntimeContext action_runtime_ctx = create_runtime_context(); + std::shared_ptr<MemTrackerLimiter> mem_tracker = + MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::QUERY, "Test"); + action_runtime_ctx.resource_ctx->memory_context()->set_mem_tracker(mem_tracker); + EXPECT_FALSE(policy->is_match(&action_runtime_ctx)); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + action_runtime_ctx.resource_ctx->task_controller()->finish(); + EXPECT_FALSE(policy->is_match(&action_runtime_ctx)); + action_runtime_ctx.resource_ctx->io_context()->update_scan_rows(101); + EXPECT_FALSE(policy->is_match(&action_runtime_ctx)); + action_runtime_ctx.resource_ctx->io_context()->update_scan_bytes(1001); + EXPECT_FALSE(policy->is_match(&action_runtime_ctx)); + action_runtime_ctx.resource_ctx->memory_context()->mem_tracker()->consume(10001); + EXPECT_TRUE(policy->is_match(&action_runtime_ctx)); +} + +TEST_F(WorkloadSchedPolicyTest, test_operator) { + // LESS + { + std::shared_ptr<WorkloadSchedPolicy> policy = std::make_shared<WorkloadSchedPolicy>(); + std::vector<std::unique_ptr<WorkloadCondition>> cond_ptr_list; + cond_ptr_list.push_back(create_workload_condition(TWorkloadMetricType::type::BE_SCAN_ROWS, + TCompareOperator::type::LESS, "100")); + std::vector<std::unique_ptr<WorkloadAction>> action_ptr_list; + std::set<int64_t> wg_id_set; + policy->init(0, "p1", 0, true, 0, wg_id_set, std::move(cond_ptr_list), + std::move(action_ptr_list)); + + WorkloadAction::RuntimeContext action_runtime_ctx = create_runtime_context(); + EXPECT_TRUE(policy->is_match(&action_runtime_ctx)); + action_runtime_ctx.resource_ctx->io_context()->update_scan_rows(100); + EXPECT_FALSE(policy->is_match(&action_runtime_ctx)); + } + + // EQUAL + { + std::shared_ptr<WorkloadSchedPolicy> policy = std::make_shared<WorkloadSchedPolicy>(); + std::vector<std::unique_ptr<WorkloadCondition>> cond_ptr_list; + cond_ptr_list.push_back(create_workload_condition(TWorkloadMetricType::type::BE_SCAN_ROWS, + TCompareOperator::type::EQUAL, "100")); + std::vector<std::unique_ptr<WorkloadAction>> action_ptr_list; + std::set<int64_t> wg_id_set; + policy->init(0, "p1", 0, true, 0, wg_id_set, std::move(cond_ptr_list), + std::move(action_ptr_list)); + + WorkloadAction::RuntimeContext action_runtime_ctx = create_runtime_context(); + EXPECT_FALSE(policy->is_match(&action_runtime_ctx)); + action_runtime_ctx.resource_ctx->io_context()->update_scan_rows(100); + EXPECT_TRUE(policy->is_match(&action_runtime_ctx)); + } +} + +TEST_F(WorkloadSchedPolicyTest, test_wg_id_set) { + std::shared_ptr<WorkloadSchedPolicy> policy = std::make_shared<WorkloadSchedPolicy>(); + std::vector<std::unique_ptr<WorkloadCondition>> cond_ptr_list; + cond_ptr_list.push_back(create_workload_condition(TWorkloadMetricType::type::BE_SCAN_ROWS, + TCompareOperator::type::GREATER_EQUAL, "0")); + std::vector<std::unique_ptr<WorkloadAction>> action_ptr_list; + std::set<int64_t> wg_id_set; + wg_id_set.insert(1); + policy->init(0, "p1", 0, true, 0, wg_id_set, std::move(cond_ptr_list), + std::move(action_ptr_list)); + + WorkloadAction::RuntimeContext action_runtime_ctx = create_runtime_context(); + EXPECT_FALSE(policy->is_match(&action_runtime_ctx)); + TWorkloadGroupInfo tworkload_group_info; + tworkload_group_info.__isset.id = true; + tworkload_group_info.id = 1; + tworkload_group_info.__isset.version = true; + tworkload_group_info.version = 1; + WorkloadGroupInfo workload_group_info = + WorkloadGroupInfo::parse_topic_info(tworkload_group_info); + auto workload_group = std::make_shared<WorkloadGroup>(workload_group_info); + action_runtime_ctx.resource_ctx->set_workload_group(workload_group); + EXPECT_TRUE(policy->is_match(&action_runtime_ctx)); +} +} // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org