This is an automated email from the ASF dual-hosted git repository. zouxinyi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 44d9aa43154 [opt](resource) step3: Remove WorkloadQueryInfo, replaced by ResourceContext (#47960) 44d9aa43154 is described below commit 44d9aa43154b4c740a4d3667c75627eb5a256b71 Author: Xinyi Zou <zouxi...@selectdb.com> AuthorDate: Mon Feb 17 18:55:19 2025 +0800 [opt](resource) step3: Remove WorkloadQueryInfo, replaced by ResourceContext (#47960) ### What problem does this PR solve? Modify `WorkloadSchedPolicy`, `WorkloadSchedPolicyMgr`, `WorkloadAction` --- be/src/olap/delta_writer.cpp | 2 +- be/src/runtime/fragment_mgr.cpp | 12 +--- be/src/runtime/fragment_mgr.h | 2 +- be/src/runtime/load_channel.cpp | 2 +- be/src/runtime/query_context.cpp | 2 +- be/src/runtime/query_context.h | 4 +- be/src/runtime/runtime_query_statistics_mgr.cpp | 22 ------ be/src/runtime/runtime_query_statistics_mgr.h | 5 -- be/src/runtime/thread_context.cpp | 3 +- be/src/runtime/thread_context.h | 84 +++++++++------------- be/src/runtime/workload_management/cpu_context.cpp | 5 +- .../workload_management/resource_context.cpp | 6 +- .../runtime/workload_management/resource_context.h | 10 +-- .../workload_management/workload_action.cpp | 15 ++-- .../runtime/workload_management/workload_action.h | 29 ++++++-- .../workload_management/workload_group_context.h | 45 ------------ .../workload_management/workload_query_info.h | 37 ---------- .../workload_management/workload_sched_policy.cpp | 52 ++++++++++---- .../workload_management/workload_sched_policy.h | 10 +-- .../workload_sched_policy_mgr.cpp | 26 ++++--- .../runtime/memory/thread_mem_tracker_mgr_test.cpp | 12 ++-- 21 files changed, 149 insertions(+), 236 deletions(-) diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index 69c2256a5eb..992dac27ec8 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -106,7 +106,7 @@ Status BaseDeltaWriter::init() { auto* t_ctx = doris::thread_context(true); std::shared_ptr<WorkloadGroup> wg_sptr = nullptr; if (t_ctx && t_ctx->is_attach_task()) { - wg_sptr = t_ctx->resource_ctx()->workload_group_context()->workload_group(); + wg_sptr = t_ctx->resource_ctx()->workload_group(); } RETURN_IF_ERROR(_rowset_builder->init()); RETURN_IF_ERROR(_memtable_writer->init( diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 4dfa966b20b..009dc34bb96 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -79,7 +79,6 @@ #include "runtime/types.h" #include "runtime/workload_group/workload_group.h" #include "runtime/workload_group/workload_group_manager.h" -#include "runtime/workload_management/workload_query_info.h" #include "service/backend_options.h" #include "util/brpc_client_cache.h" #include "util/debug_points.h" @@ -1399,18 +1398,13 @@ Status FragmentMgr::merge_filter(const PMergeFilterRequest* request, return merge_status; } -void FragmentMgr::get_runtime_query_info(std::vector<WorkloadQueryInfo>* query_info_list) { +void FragmentMgr::get_runtime_query_info( + std::vector<std::weak_ptr<ResourceContext>>* _resource_ctx_list) { _query_ctx_map.apply( [&](phmap::flat_hash_map<TUniqueId, std::weak_ptr<QueryContext>>& map) -> Status { for (auto iter = map.begin(); iter != map.end();) { if (auto q_ctx = iter->second.lock()) { - WorkloadQueryInfo workload_query_info; - workload_query_info.query_id = print_id(iter->first); - workload_query_info.tquery_id = iter->first; - workload_query_info.wg_id = q_ctx->workload_group() == nullptr - ? -1 - : q_ctx->workload_group()->id(); - query_info_list->push_back(workload_query_info); + _resource_ctx_list->push_back(q_ctx->resource_ctx()); iter++; } else { iter = map.erase(iter); diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index 78f117a15db..9e37b7811f0 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -176,7 +176,7 @@ public: std::string dump_pipeline_tasks(int64_t duration = 0); std::string dump_pipeline_tasks(TUniqueId& query_id); - void get_runtime_query_info(std::vector<WorkloadQueryInfo>* _query_info_list); + void get_runtime_query_info(std::vector<std::weak_ptr<ResourceContext>>* _resource_ctx_list); Status get_realtime_exec_status(const TUniqueId& query_id, TReportExecStatusParams* exec_status); diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp index 99f41899215..ce10666d84c 100644 --- a/be/src/runtime/load_channel.cpp +++ b/be/src/runtime/load_channel.cpp @@ -62,7 +62,7 @@ LoadChannel::LoadChannel(const UniqueId& load_id, int64_t timeout_s, bool is_hig wg_ptr = ExecEnv::GetInstance()->workload_group_mgr()->get_group(wg_id); if (wg_ptr != nullptr) { wg_ptr->add_mem_tracker_limiter(mem_tracker); - _resource_ctx->workload_group_context()->set_workload_group(wg_ptr); + _resource_ctx->set_workload_group(wg_ptr); } } } diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index a6cccc22091..9334db576ff 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -351,7 +351,7 @@ ThreadPool* QueryContext::get_memtable_flush_pool() { } void QueryContext::set_workload_group(WorkloadGroupPtr& wg) { - _resource_ctx->workload_group_context()->set_workload_group(wg); + _resource_ctx->set_workload_group(wg); // Should add query first, then the workload group will not be deleted. // see task_group_manager::delete_workload_group_by_ids workload_group()->add_mem_tracker_limiter(query_mem_tracker()); diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index c02cf97bf1b..f8515f762d3 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -260,9 +260,7 @@ public: bool is_nereids() const { return _is_nereids; } - WorkloadGroupPtr workload_group() const { - return _resource_ctx->workload_group_context()->workload_group(); - } + WorkloadGroupPtr workload_group() const { return _resource_ctx->workload_group(); } std::shared_ptr<MemTrackerLimiter> query_mem_tracker() const { return _resource_ctx->memory_context()->mem_tracker(); } diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp b/be/src/runtime/runtime_query_statistics_mgr.cpp index f09558f7b9c..aa1e8546e02 100644 --- a/be/src/runtime/runtime_query_statistics_mgr.cpp +++ b/be/src/runtime/runtime_query_statistics_mgr.cpp @@ -453,28 +453,6 @@ void RuntimeQueryStatisticsMgr::report_runtime_query_statistics() { } } -void RuntimeQueryStatisticsMgr::get_metric_map( - std::string query_id, std::map<WorkloadMetricType, std::string>& metric_map) { - std::shared_lock<std::shared_mutex> read_lock(_resource_contexts_map_lock); - if (_resource_contexts_map.find(query_id) != _resource_contexts_map.end()) { - auto* resource_ctx = _resource_contexts_map.at(query_id).get(); - metric_map.emplace( - WorkloadMetricType::QUERY_TIME, - std::to_string(MonotonicMillis() - resource_ctx->task_controller()->finish_time())); - metric_map.emplace(WorkloadMetricType::SCAN_ROWS, - std::to_string(resource_ctx->io_context()->scan_rows())); - metric_map.emplace(WorkloadMetricType::SCAN_BYTES, - std::to_string(resource_ctx->io_context()->scan_bytes())); - metric_map.emplace(WorkloadMetricType::QUERY_MEMORY_BYTES, - std::to_string(resource_ctx->memory_context()->current_memory_bytes())); - } else { - metric_map.emplace(WorkloadMetricType::QUERY_TIME, "-1"); - metric_map.emplace(WorkloadMetricType::SCAN_ROWS, "-1"); - metric_map.emplace(WorkloadMetricType::SCAN_BYTES, "-1"); - metric_map.emplace(WorkloadMetricType::QUERY_MEMORY_BYTES, "-1"); - } -} - void RuntimeQueryStatisticsMgr::get_active_be_tasks_block(vectorized::Block* block) { std::shared_lock<std::shared_mutex> read_lock(_resource_contexts_map_lock); int64_t be_id = ExecEnv::GetInstance()->cluster_info()->backend_id; diff --git a/be/src/runtime/runtime_query_statistics_mgr.h b/be/src/runtime/runtime_query_statistics_mgr.h index 71a93ee9220..0bcdd647373 100644 --- a/be/src/runtime/runtime_query_statistics_mgr.h +++ b/be/src/runtime/runtime_query_statistics_mgr.h @@ -56,11 +56,6 @@ public: void report_runtime_query_statistics(); - // used for workload scheduler policy - // TODO: save ResourceContext in WorkloadGroupMgr, put get_metric_map into WorkloadGroupMgr. - void get_metric_map(std::string query_id, - std::map<WorkloadMetricType, std::string>& metric_map); - // used for backend_active_tasks void get_active_be_tasks_block(vectorized::Block* block); diff --git a/be/src/runtime/thread_context.cpp b/be/src/runtime/thread_context.cpp index 09394c8a721..266515ae2a6 100644 --- a/be/src/runtime/thread_context.cpp +++ b/be/src/runtime/thread_context.cpp @@ -64,8 +64,7 @@ SwitchResourceContext::SwitchResourceContext(const std::shared_ptr<ResourceConte old_resource_ctx_ = thread_context()->resource_ctx(); thread_context()->resource_ctx_ = rc; thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker( - rc->memory_context()->mem_tracker(), - rc->workload_group_context()->workload_group()); + rc->memory_context()->mem_tracker(), rc->workload_group()); } } diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index 05cef5d5f9b..fbdbe397d40 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -111,54 +111,41 @@ __VA_ARGS__; \ } while (0) -#define LIMIT_LOCAL_SCAN_IO(data_dir, bytes_read) \ - std::shared_ptr<IOThrottle> iot = nullptr; \ - auto* t_ctx = doris::thread_context(true); \ - if (t_ctx && t_ctx->is_attach_task() && \ - t_ctx->resource_ctx()->workload_group_context()->workload_group() != nullptr) { \ - iot = t_ctx->resource_ctx() \ - ->workload_group_context() \ - ->workload_group() \ - ->get_local_scan_io_throttle(data_dir); \ - } \ - if (iot) { \ - iot->acquire(-1); \ - } \ - Defer defer { \ - [&]() { \ - if (iot) { \ - iot->update_next_io_time(*bytes_read); \ - t_ctx->resource_ctx() \ - ->workload_group_context() \ - ->workload_group() \ - ->update_local_scan_io(data_dir, *bytes_read); \ - } \ - } \ +#define LIMIT_LOCAL_SCAN_IO(data_dir, bytes_read) \ + std::shared_ptr<IOThrottle> iot = nullptr; \ + auto* t_ctx = doris::thread_context(true); \ + if (t_ctx && t_ctx->is_attach_task() && t_ctx->resource_ctx()->workload_group() != nullptr) { \ + iot = t_ctx->resource_ctx()->workload_group()->get_local_scan_io_throttle(data_dir); \ + } \ + if (iot) { \ + iot->acquire(-1); \ + } \ + Defer defer { \ + [&]() { \ + if (iot) { \ + iot->update_next_io_time(*bytes_read); \ + t_ctx->resource_ctx()->workload_group()->update_local_scan_io(data_dir, \ + *bytes_read); \ + } \ + } \ } -#define LIMIT_REMOTE_SCAN_IO(bytes_read) \ - std::shared_ptr<IOThrottle> iot = nullptr; \ - auto* t_ctx = doris::thread_context(true); \ - if (t_ctx && t_ctx->is_attach_task() && \ - t_ctx->resource_ctx()->workload_group_context()->workload_group() != nullptr) { \ - iot = t_ctx->resource_ctx() \ - ->workload_group_context() \ - ->workload_group() \ - ->get_remote_scan_io_throttle(); \ - } \ - if (iot) { \ - iot->acquire(-1); \ - } \ - Defer defer { \ - [&]() { \ - if (iot) { \ - iot->update_next_io_time(*bytes_read); \ - t_ctx->resource_ctx() \ - ->workload_group_context() \ - ->workload_group() \ - ->update_remote_scan_io(*bytes_read); \ - } \ - } \ +#define LIMIT_REMOTE_SCAN_IO(bytes_read) \ + std::shared_ptr<IOThrottle> iot = nullptr; \ + auto* t_ctx = doris::thread_context(true); \ + if (t_ctx && t_ctx->is_attach_task() && t_ctx->resource_ctx()->workload_group() != nullptr) { \ + iot = t_ctx->resource_ctx()->workload_group()->get_remote_scan_io_throttle(); \ + } \ + if (iot) { \ + iot->acquire(-1); \ + } \ + Defer defer { \ + [&]() { \ + if (iot) { \ + iot->update_next_io_time(*bytes_read); \ + t_ctx->resource_ctx()->workload_group()->update_remote_scan_io(*bytes_read); \ + } \ + } \ } namespace doris { @@ -195,9 +182,8 @@ public: // will only attach_task at the beginning of the thread function, there should be no duplicate attach_task. DCHECK(resource_ctx_ == nullptr); resource_ctx_ = rc; - thread_mem_tracker_mgr->attach_limiter_tracker( - rc->memory_context()->mem_tracker(), - rc->workload_group_context()->workload_group()); + thread_mem_tracker_mgr->attach_limiter_tracker(rc->memory_context()->mem_tracker(), + rc->workload_group()); thread_mem_tracker_mgr->enable_wait_gc(); } diff --git a/be/src/runtime/workload_management/cpu_context.cpp b/be/src/runtime/workload_management/cpu_context.cpp index b6bbcc0da8a..ee7a0024b0c 100644 --- a/be/src/runtime/workload_management/cpu_context.cpp +++ b/be/src/runtime/workload_management/cpu_context.cpp @@ -25,9 +25,8 @@ namespace doris { void CPUContext::update_cpu_cost_ms(int64_t delta) const { stats_.cpu_cost_ms_counter_->update(delta); - if (resource_ctx_ != nullptr && - resource_ctx_->workload_group_context()->workload_group() != nullptr) { - resource_ctx_->workload_group_context()->workload_group()->update_cpu_time(delta); + if (resource_ctx_ != nullptr && resource_ctx_->workload_group() != nullptr) { + resource_ctx_->workload_group()->update_cpu_time(delta); } } diff --git a/be/src/runtime/workload_management/resource_context.cpp b/be/src/runtime/workload_management/resource_context.cpp index 765e4615505..f78ac56e9f4 100644 --- a/be/src/runtime/workload_management/resource_context.cpp +++ b/be/src/runtime/workload_management/resource_context.cpp @@ -48,7 +48,11 @@ void ResourceContext::to_thrift_query_statistics(TQueryStatistics* statistics) c statistics->__set_scan_bytes_from_remote_storage( io_context()->scan_bytes_from_remote_storage()); statistics->__set_scan_bytes_from_local_storage(io_context()->scan_bytes_from_local_storage()); - statistics->__set_workload_group_id(workload_group_context()->workload_group_id()); + if (workload_group() != nullptr) { + statistics->__set_workload_group_id(workload_group()->id()); + } else { + statistics->__set_workload_group_id(-1); + } } } // namespace doris diff --git a/be/src/runtime/workload_management/resource_context.h b/be/src/runtime/workload_management/resource_context.h index 0c163f515cb..64d59289db8 100644 --- a/be/src/runtime/workload_management/resource_context.h +++ b/be/src/runtime/workload_management/resource_context.h @@ -28,7 +28,6 @@ #include "runtime/workload_management/io_context.h" #include "runtime/workload_management/memory_context.h" #include "runtime/workload_management/task_controller.h" -#include "runtime/workload_management/workload_group_context.h" #include "util/runtime_profile.h" namespace doris { @@ -46,7 +45,6 @@ public: cpu_context_ = CPUContext::create_unique(); memory_context_ = MemoryContext::create_unique(); io_context_ = IOContext::create_unique(); - workload_group_context_ = WorkloadGroupContext::create_unique(); task_controller_ = TaskController::create_unique(); cpu_context_->set_resource_ctx(this); @@ -59,8 +57,8 @@ public: CPUContext* cpu_context() const { return cpu_context_.get(); } MemoryContext* memory_context() const { return memory_context_.get(); } IOContext* io_context() const { return io_context_.get(); } - WorkloadGroupContext* workload_group_context() const { return workload_group_context_.get(); } TaskController* task_controller() const { return task_controller_.get(); } + WorkloadGroupPtr workload_group() const { return _workload_group; } void set_cpu_context(std::unique_ptr<CPUContext> cpu_context) { cpu_context_ = std::move(cpu_context); @@ -74,12 +72,10 @@ public: io_context_ = std::move(io_context); io_context_->set_resource_ctx(this); } - void set_workload_group_context(std::unique_ptr<WorkloadGroupContext> wg_context) { - workload_group_context_ = std::move(wg_context); - } void set_task_controller(std::unique_ptr<TaskController> task_controller) { task_controller_ = std::move(task_controller); } + void set_workload_group(WorkloadGroupPtr wg) { _workload_group = wg; } RuntimeProfile* profile() { return const_cast<RuntimeProfile*>(resource_profile_.get().get()); } @@ -109,9 +105,9 @@ private: std::unique_ptr<CPUContext> cpu_context_ = nullptr; std::unique_ptr<MemoryContext> memory_context_ = nullptr; std::unique_ptr<IOContext> io_context_ = nullptr; - std::unique_ptr<WorkloadGroupContext> workload_group_context_ = nullptr; std::unique_ptr<TaskController> task_controller_ = nullptr; + WorkloadGroupPtr _workload_group = nullptr; MultiVersion<RuntimeProfile> resource_profile_; }; diff --git a/be/src/runtime/workload_management/workload_action.cpp b/be/src/runtime/workload_management/workload_action.cpp index 77042b074fd..f9e42b43afa 100644 --- a/be/src/runtime/workload_management/workload_action.cpp +++ b/be/src/runtime/workload_management/workload_action.cpp @@ -21,18 +21,19 @@ namespace doris { -void WorkloadActionCancelQuery::exec(WorkloadQueryInfo* query_info) { +void WorkloadActionCancelQuery::exec(WorkloadAction::RuntimeContext* action_runtime_ctx) { std::stringstream msg; - msg << "query " << query_info->query_id - << " cancelled by workload policy: " << query_info->policy_name - << ", id:" << query_info->policy_id << ", " << query_info->cond_eval_msg; + msg << "query " << print_id(action_runtime_ctx->resource_ctx->task_controller()->task_id()) + << " cancelled by workload policy: " << action_runtime_ctx->policy_name + << ", id:" << action_runtime_ctx->policy_id << ", " << action_runtime_ctx->cond_eval_msg; std::string msg_str = msg.str(); LOG(INFO) << "[workload_schedule]" << msg_str; - ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_info->tquery_id, - Status::InternalError<false>(msg_str)); + ExecEnv::GetInstance()->fragment_mgr()->cancel_query( + action_runtime_ctx->resource_ctx->task_controller()->task_id(), + Status::InternalError<false>(msg_str)); } -void WorkloadActionMoveQuery::exec(WorkloadQueryInfo* query_info) { +void WorkloadActionMoveQuery::exec(WorkloadAction::RuntimeContext* action_runtime_ctx) { LOG(INFO) << "[workload_schedule]move query action run group=" << _wg_name; }; diff --git a/be/src/runtime/workload_management/workload_action.h b/be/src/runtime/workload_management/workload_action.h index 785acc73c3a..4943b13a5d5 100644 --- a/be/src/runtime/workload_management/workload_action.h +++ b/be/src/runtime/workload_management/workload_action.h @@ -17,36 +17,53 @@ #pragma once -#include "runtime/workload_management/workload_query_info.h" +#include <gen_cpp/BackendService_types.h> +#include <glog/logging.h> namespace doris { +class ResourceContext; + enum WorkloadActionType { MOVE_QUERY_TO_GROUP = 0, CANCEL_QUERY = 1 }; class WorkloadAction { public: + // only used as a temporary variable in `WorkloadSchedPolicyMgr::_schedule_workload`. + struct RuntimeContext { + public: + RuntimeContext(const std::shared_ptr<ResourceContext>& ctx) : resource_ctx(ctx) {} + + int64_t policy_id; + std::string policy_name; + std::string cond_eval_msg; + + std::shared_ptr<ResourceContext> resource_ctx; + }; + WorkloadAction() = default; virtual ~WorkloadAction() = default; - virtual void exec(WorkloadQueryInfo* query_info) = 0; + virtual void exec(WorkloadAction::RuntimeContext* action_runtime_ctx) = 0; virtual WorkloadActionType get_action_type() = 0; }; class WorkloadActionCancelQuery : public WorkloadAction { public: - void exec(WorkloadQueryInfo* query_info) override; + void exec(WorkloadAction::RuntimeContext* action_runtime_ctx) override; - WorkloadActionType get_action_type() override { return CANCEL_QUERY; } + WorkloadActionType get_action_type() override { return WorkloadActionType::CANCEL_QUERY; } }; //todo(wb) implement it class WorkloadActionMoveQuery : public WorkloadAction { public: WorkloadActionMoveQuery(std::string wg_name) : _wg_name(wg_name) {} - void exec(WorkloadQueryInfo* query_info) override; + void exec(WorkloadAction::RuntimeContext* action_runtime_ctx) override; - WorkloadActionType get_action_type() override { return MOVE_QUERY_TO_GROUP; } + WorkloadActionType get_action_type() override { + return WorkloadActionType::MOVE_QUERY_TO_GROUP; + } private: std::string _wg_name; diff --git a/be/src/runtime/workload_management/workload_group_context.h b/be/src/runtime/workload_management/workload_group_context.h deleted file mode 100644 index 38cad310925..00000000000 --- a/be/src/runtime/workload_management/workload_group_context.h +++ /dev/null @@ -1,45 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include "common/factory_creator.h" -#include "runtime/workload_group/workload_group.h" - -namespace doris { - -class WorkloadGroupContext { - ENABLE_FACTORY_CREATOR(WorkloadGroupContext); - -public: - WorkloadGroupContext() = default; - virtual ~WorkloadGroupContext() = default; - - int64_t workload_group_id() { - if (workload_group() != nullptr) { - return workload_group()->id(); - } - return -1; - } - WorkloadGroupPtr workload_group() { return _workload_group; } - void set_workload_group(WorkloadGroupPtr wg) { _workload_group = wg; } - -protected: - WorkloadGroupPtr _workload_group = nullptr; -}; - -} // namespace doris diff --git a/be/src/runtime/workload_management/workload_query_info.h b/be/src/runtime/workload_management/workload_query_info.h deleted file mode 100644 index 16151eec390..00000000000 --- a/be/src/runtime/workload_management/workload_query_info.h +++ /dev/null @@ -1,37 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include <map> - -#include "runtime/workload_management/workload_condition.h" - -namespace doris { - -class WorkloadQueryInfo { -public: - std::map<WorkloadMetricType, std::string> metric_map; - TUniqueId tquery_id; - std::string query_id; - int64_t wg_id; - int64_t policy_id; - std::string policy_name {""}; - std::string cond_eval_msg {""}; -}; - -} // namespace doris \ No newline at end of file diff --git a/be/src/runtime/workload_management/workload_sched_policy.cpp b/be/src/runtime/workload_management/workload_sched_policy.cpp index 63b9362bc21..597daafa6bc 100644 --- a/be/src/runtime/workload_management/workload_sched_policy.cpp +++ b/be/src/runtime/workload_management/workload_sched_policy.cpp @@ -17,6 +17,9 @@ #include "runtime/workload_management/workload_sched_policy.h" +#include "runtime/workload_management/resource_context.h" +#include "util/time.h" + namespace doris { void WorkloadSchedPolicy::init(int64_t id, std::string name, int version, bool enabled, @@ -47,26 +50,47 @@ void WorkloadSchedPolicy::init(int64_t id, std::string name, int version, bool e } } -bool WorkloadSchedPolicy::is_match(WorkloadQueryInfo* query_info_ptr) { +bool WorkloadSchedPolicy::is_match(WorkloadAction::RuntimeContext* action_runtime_ctx) const { if (!_enabled) { return false; } // 1 when policy has no group(_wg_id_set.size() < 0), it should match all query // 2 when policy has group, it can only match the query which has the same group - if (_wg_id_set.size() > 0 && (query_info_ptr->wg_id <= 0 || - _wg_id_set.find(query_info_ptr->wg_id) == _wg_id_set.end())) { + if (!_wg_id_set.empty() && + (action_runtime_ctx->resource_ctx->workload_group() == nullptr || + _wg_id_set.find(action_runtime_ctx->resource_ctx->workload_group()->id()) == + _wg_id_set.end())) { return false; } - auto& metric_val_map = query_info_ptr->metric_map; - std::string cond_eval_msg = ""; - for (auto& cond : _condition_list) { - if (metric_val_map.find(cond->get_workload_metric_type()) == metric_val_map.end()) { + std::string cond_eval_msg; + for (const auto& cond : _condition_list) { + std::string val; + switch (cond->get_workload_metric_type()) { + case WorkloadMetricType::QUERY_TIME: { + val = std::to_string( + MonotonicMillis() - + action_runtime_ctx->resource_ctx->task_controller()->finish_time()); + break; + } + case WorkloadMetricType::SCAN_BYTES: { + val = std::to_string(action_runtime_ctx->resource_ctx->io_context()->scan_bytes()); + break; + } + case WorkloadMetricType::SCAN_ROWS: { + val = std::to_string(action_runtime_ctx->resource_ctx->io_context()->scan_rows()); + break; + } + case WorkloadMetricType::QUERY_MEMORY_BYTES: { + val = std::to_string( + action_runtime_ctx->resource_ctx->memory_context()->current_memory_bytes()); + break; + } + default: return false; } - std::string val = metric_val_map.at(cond->get_workload_metric_type()); if (!cond->eval(val)) { return false; } @@ -74,15 +98,15 @@ bool WorkloadSchedPolicy::is_match(WorkloadQueryInfo* query_info_ptr) { cond->get_metric_value_string() + "), "; } cond_eval_msg = cond_eval_msg.substr(0, cond_eval_msg.size() - 2); - query_info_ptr->cond_eval_msg = cond_eval_msg; + action_runtime_ctx->cond_eval_msg = cond_eval_msg; return true; } -void WorkloadSchedPolicy::exec_action(WorkloadQueryInfo* query_info) { - for (int i = 0; i < _action_list.size(); i++) { - query_info->policy_id = this->_id; - query_info->policy_name = this->_name; - _action_list[i]->exec(query_info); +void WorkloadSchedPolicy::exec_action(WorkloadAction::RuntimeContext* action_runtime_ctx) { + for (auto& action : _action_list) { + action_runtime_ctx->policy_id = this->_id; + action_runtime_ctx->policy_name = this->_name; + action->exec(action_runtime_ctx); } } diff --git a/be/src/runtime/workload_management/workload_sched_policy.h b/be/src/runtime/workload_management/workload_sched_policy.h index 6554634d9af..ce1e4b6655e 100644 --- a/be/src/runtime/workload_management/workload_sched_policy.h +++ b/be/src/runtime/workload_management/workload_sched_policy.h @@ -34,16 +34,16 @@ public: std::vector<std::unique_ptr<WorkloadCondition>> condition_list, std::vector<std::unique_ptr<WorkloadAction>> action_list); - bool enabled() { return _enabled; } - int priority() { return _priority; } + bool enabled() const { return _enabled; } + int priority() const { return _priority; } - bool is_match(WorkloadQueryInfo* query_info); + bool is_match(WorkloadAction::RuntimeContext* action_runtime_ctx) const; WorkloadActionType get_first_action_type() { return _first_action_type; } - void exec_action(WorkloadQueryInfo* query_info); + void exec_action(WorkloadAction::RuntimeContext* action_runtime_ctx); - int version() { return _version; } + int version() const { return _version; } private: int64_t _id; diff --git a/be/src/runtime/workload_management/workload_sched_policy_mgr.cpp b/be/src/runtime/workload_management/workload_sched_policy_mgr.cpp index 4690ed1d4f2..4802704c136 100644 --- a/be/src/runtime/workload_management/workload_sched_policy_mgr.cpp +++ b/be/src/runtime/workload_management/workload_sched_policy_mgr.cpp @@ -17,8 +17,10 @@ #include "runtime/workload_management/workload_sched_policy_mgr.h" +#include <memory> + #include "runtime/fragment_mgr.h" -#include "runtime/runtime_query_statistics_mgr.h" +#include "runtime/workload_management/resource_context.h" namespace doris { @@ -75,18 +77,20 @@ void WorkloadSchedPolicyMgr::update_workload_sched_policy( void WorkloadSchedPolicyMgr::_schedule_workload() { while (!_stop_latch.wait_for(std::chrono::milliseconds(500))) { - // 1 get query info - std::vector<WorkloadQueryInfo> list; + // 1 get query resource context + std::vector<std::weak_ptr<ResourceContext>> list; _exec_env->fragment_mgr()->get_runtime_query_info(&list); // todo: add timer - if (list.size() == 0) { + if (list.empty()) { continue; } - for (int i = 0; i < list.size(); i++) { - WorkloadQueryInfo* query_info_ptr = &(list[i]); - _exec_env->runtime_query_statistics_mgr()->get_metric_map(query_info_ptr->query_id, - query_info_ptr->metric_map); + for (const auto& i : list) { + auto resource_ctx = i.lock(); + if (resource_ctx == nullptr) { + continue; + } + WorkloadAction::RuntimeContext action_runtime_ctx(resource_ctx); // 2 get matched policy std::map<WorkloadActionType, std::shared_ptr<WorkloadSchedPolicy>> matched_policy_map; @@ -94,7 +98,7 @@ void WorkloadSchedPolicyMgr::_schedule_workload() { std::shared_lock<std::shared_mutex> read_lock(_policy_lock); for (auto& entity : _id_policy_map) { auto& new_policy = entity.second; - if (new_policy->is_match(query_info_ptr)) { + if (new_policy->is_match(&action_runtime_ctx)) { WorkloadActionType new_policy_type = new_policy->get_first_action_type(); if (matched_policy_map.find(new_policy_type) == matched_policy_map.end() || new_policy->priority() > @@ -105,7 +109,7 @@ void WorkloadSchedPolicyMgr::_schedule_workload() { } } - if (matched_policy_map.size() == 0) { + if (matched_policy_map.empty()) { continue; } LOG(INFO) << "[workload_schedule] matched policy size=" << matched_policy_map.size(); @@ -128,7 +132,7 @@ void WorkloadSchedPolicyMgr::_schedule_workload() { // 4 exec policy action for (const auto& [key, value] : matched_policy_map) { - value->exec_action(query_info_ptr); + value->exec_action(&action_runtime_ctx); } } } diff --git a/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp b/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp index 94e32f67810..2049bb29043 100644 --- a/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp +++ b/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp @@ -43,7 +43,7 @@ TEST_F(ThreadMemTrackerMgrTest, ConsumeMemory) { MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER, "UT-ConsumeMemory"); std::shared_ptr<ResourceContext> rc = ResourceContext::create_shared(); rc->memory_context()->set_mem_tracker(t); - rc->workload_group_context()->set_workload_group(workload_group); + rc->set_workload_group(workload_group); int64_t size1 = 4 * 1024; int64_t size2 = 4 * 1024 * 1024; @@ -104,7 +104,7 @@ TEST_F(ThreadMemTrackerMgrTest, NestedSwitchMemTracker) { MemTrackerLimiter::Type::OTHER, "UT-NestedSwitchMemTracker3"); std::shared_ptr<ResourceContext> rc = ResourceContext::create_shared(); rc->memory_context()->set_mem_tracker(t1); - rc->workload_group_context()->set_workload_group(workload_group); + rc->set_workload_group(workload_group); int64_t size1 = 4 * 1024; int64_t size2 = 4 * 1024 * 1024; @@ -177,7 +177,7 @@ TEST_F(ThreadMemTrackerMgrTest, MultiMemTracker) { std::shared_ptr<MemTracker> t3 = std::make_shared<MemTracker>("UT-MultiMemTracker3"); std::shared_ptr<ResourceContext> rc = ResourceContext::create_shared(); rc->memory_context()->set_mem_tracker(t1); - rc->workload_group_context()->set_workload_group(workload_group); + rc->set_workload_group(workload_group); int64_t size1 = 4 * 1024; int64_t size2 = 4 * 1024 * 1024; @@ -239,7 +239,7 @@ TEST_F(ThreadMemTrackerMgrTest, ReserveMemory) { MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER, "UT-ReserveMemory"); std::shared_ptr<ResourceContext> rc = ResourceContext::create_shared(); rc->memory_context()->set_mem_tracker(t); - rc->workload_group_context()->set_workload_group(workload_group); + rc->set_workload_group(workload_group); int64_t size1 = 4 * 1024; int64_t size2 = 4 * 1024 * 1024; @@ -337,7 +337,7 @@ TEST_F(ThreadMemTrackerMgrTest, NestedReserveMemory) { MemTrackerLimiter::Type::OTHER, "UT-NestedReserveMemory"); std::shared_ptr<ResourceContext> rc = ResourceContext::create_shared(); rc->memory_context()->set_mem_tracker(t); - rc->workload_group_context()->set_workload_group(workload_group); + rc->set_workload_group(workload_group); int64_t size2 = 4 * 1024 * 1024; int64_t size3 = size2 * 2; @@ -396,7 +396,7 @@ TEST_F(ThreadMemTrackerMgrTest, NestedSwitchMemTrackerReserveMemory) { MemTrackerLimiter::Type::OTHER, "UT-NestedSwitchMemTrackerReserveMemory3"); std::shared_ptr<ResourceContext> rc = ResourceContext::create_shared(); rc->memory_context()->set_mem_tracker(t1); - rc->workload_group_context()->set_workload_group(workload_group); + rc->set_workload_group(workload_group); int64_t size1 = 4 * 1024; int64_t size2 = 4 * 1024 * 1024; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org