This is an automated email from the ASF dual-hosted git repository. wangbo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 31b3be456c5 add workload scheduler in be (#29116) 31b3be456c5 is described below commit 31b3be456c53c8c0ec7bb64ecca42866b71e86e0 Author: wangbo <wan...@apache.org> AuthorDate: Thu Dec 28 15:04:22 2023 +0800 add workload scheduler in be (#29116) --- be/src/agent/agent_server.cpp | 8 ++ be/src/agent/workload_sched_policy_listener.cpp | 78 ++++++++++++ .../src/agent/workload_sched_policy_listener.h | 30 ++--- be/src/runtime/exec_env.h | 4 + be/src/runtime/exec_env_init.cpp | 8 ++ be/src/runtime/fragment_mgr.cpp | 20 ++++ be/src/runtime/fragment_mgr.h | 3 + be/src/runtime/query_context.h | 2 + .../workload_management/workload_action.cpp | 28 ++--- .../runtime/workload_management/workload_action.h | 68 +++++++++++ .../workload_management/workload_comparator.h | 77 ++++++++++++ .../workload_management/workload_condition.cpp | 59 ++++++++++ .../workload_management/workload_condition.h | 97 +++++++++++++++ .../workload_management/workload_query_info.h | 26 ++-- .../workload_management/workload_sched_policy.cpp | 73 ++++++++++++ .../workload_management/workload_sched_policy.h | 59 ++++++++++ .../workload_sched_policy_mgr.cpp | 131 +++++++++++++++++++++ .../workload_sched_policy_mgr.h | 43 ++++--- .../main/java/org/apache/doris/catalog/Env.java | 3 + .../doris/common/publish/TopicPublisherThread.java | 7 +- .../common/publish/WorkloadGroupPublisher.java | 9 +- .../workloadschedpolicy/WorkloadSchedPolicy.java | 67 +++++++++++ .../WorkloadSchedPolicyMgr.java | 17 +++ .../WorkloadSchedPolicyPublisher.java} | 17 ++- gensrc/thrift/BackendService.thrift | 44 ++++++- 25 files changed, 907 insertions(+), 71 deletions(-) diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp index 9902a0ad726..a3b18c53567 100644 --- a/be/src/agent/agent_server.cpp +++ b/be/src/agent/agent_server.cpp @@ -31,6 +31,7 @@ #include "agent/topic_subscriber.h" #include "agent/utils.h" #include "agent/workload_group_listener.h" +#include "agent/workload_sched_policy_listener.h" #include "common/config.h" #include "common/logging.h" #include "common/status.h" @@ -72,6 +73,13 @@ AgentServer::AgentServer(ExecEnv* exec_env, const TMasterInfo& master_info) LOG(INFO) << "Register workload group listener"; _topic_subscriber->register_listener(doris::TTopicInfoType::type::WORKLOAD_GROUP, std::move(wg_listener)); + + std::unique_ptr<TopicListener> policy_listener = + std::make_unique<WorkloadschedPolicyListener>(exec_env); + LOG(INFO) << "Register workload scheduler policy listener"; + _topic_subscriber->register_listener(doris::TTopicInfoType::type::WORKLOAD_SCHED_POLICY, + std::move(policy_listener)); + #endif } diff --git a/be/src/agent/workload_sched_policy_listener.cpp b/be/src/agent/workload_sched_policy_listener.cpp new file mode 100644 index 00000000000..461fd2cbb0f --- /dev/null +++ b/be/src/agent/workload_sched_policy_listener.cpp @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "agent/workload_sched_policy_listener.h" + +#include "runtime/workload_management/workload_action.h" +#include "runtime/workload_management/workload_condition.h" +#include "runtime/workload_management/workload_sched_policy.h" +#include "runtime/workload_management/workload_sched_policy_mgr.h" + +namespace doris { + +void WorkloadschedPolicyListener::handle_topic_info(const std::vector<TopicInfo>& topic_info_list) { + std::map<uint64_t, std::shared_ptr<WorkloadSchedPolicy>> policy_map; + for (const TopicInfo& topic_info : topic_info_list) { + if (!topic_info.__isset.workload_sched_policy) { + continue; + } + + TWorkloadSchedPolicy tpolicy = topic_info.workload_sched_policy; + // some metric or action can not exec in be, then need skip + bool need_skip_current_policy = false; + + std::vector<std::unique_ptr<WorkloadCondition>> cond_ptr_list; + for (TWorkloadCondition& cond : tpolicy.condition_list) { + std::unique_ptr<WorkloadCondition> cond_ptr = + WorkloadConditionFactory::create_workload_condition(&cond); + if (cond_ptr == nullptr) { + need_skip_current_policy = true; + break; + } + cond_ptr_list.push_back(std::move(cond_ptr)); + } + if (need_skip_current_policy) { + continue; + } + + std::vector<std::unique_ptr<WorkloadAction>> action_ptr_list; + for (TWorkloadAction& action : tpolicy.action_list) { + std::unique_ptr<WorkloadAction> action_ptr = + WorkloadActionFactory::create_workload_action(&action); + if (action_ptr == nullptr) { + need_skip_current_policy = true; + break; + } + action_ptr_list.push_back(std::move(action_ptr)); + } + if (need_skip_current_policy) { + continue; + } + + std::shared_ptr<WorkloadSchedPolicy> policy_ptr = std::make_shared<WorkloadSchedPolicy>(); + policy_ptr->init(tpolicy.id, tpolicy.name, tpolicy.version, tpolicy.enabled, + tpolicy.priority, std::move(cond_ptr_list), std::move(action_ptr_list)); + policy_map.emplace(tpolicy.id, std::move(policy_ptr)); + } + size_t new_policy_size = policy_map.size(); + if (new_policy_size > 0) { + _exec_env->workload_sched_policy_mgr()->update_workload_sched_policy(std::move(policy_map)); + } + LOG(INFO) << "[workload_schedule]finish update workload schedule policy, size=" + << new_policy_size; +} +} // namespace doris \ No newline at end of file diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java b/be/src/agent/workload_sched_policy_listener.h similarity index 59% copy from fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java copy to be/src/agent/workload_sched_policy_listener.h index 6c5ce9e4c11..f1410b502fd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java +++ b/be/src/agent/workload_sched_policy_listener.h @@ -15,23 +15,23 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.common.publish; +#pragma once -import org.apache.doris.catalog.Env; -import org.apache.doris.thrift.TPublishTopicRequest; -import org.apache.doris.thrift.TTopicInfoType; +#include <glog/logging.h> -public class WorkloadGroupPublisher implements TopicPublisher { +#include "agent/topic_listener.h" +#include "runtime/exec_env.h" - private Env env; +namespace doris { - public WorkloadGroupPublisher(Env env) { - this.env = env; - } +class WorkloadschedPolicyListener : public TopicListener { +public: + WorkloadschedPolicyListener(ExecEnv* exec_env) : _exec_env(exec_env) {} - @Override - public void getTopicInfo(TPublishTopicRequest req) { - req.putToTopicMap(TTopicInfoType.WORKLOAD_GROUP, - env.getWorkloadGroupMgr().getPublishTopicInfo()); - } -} + void handle_topic_info(const std::vector<TopicInfo>& topic_info_list) override; + +private: + ExecEnv* _exec_env = nullptr; +}; + +} // namespace doris \ No newline at end of file diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 4c1060891f5..f8a52ad1ec4 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -63,6 +63,7 @@ class InvertedIndexSearcherCache; class InvertedIndexQueryCache; } // namespace segment_v2 +class WorkloadSchedPolicyMgr; class BfdParser; class BrokerMgr; template <class T> @@ -153,6 +154,7 @@ public: pipeline::TaskScheduler* pipeline_task_scheduler() { return _without_group_task_scheduler; } pipeline::TaskScheduler* pipeline_task_group_scheduler() { return _with_group_task_scheduler; } taskgroup::TaskGroupManager* task_group_manager() { return _task_group_manager; } + WorkloadSchedPolicyMgr* workload_sched_policy_mgr() { return _workload_sched_mgr; } // using template to simplify client cache management template <typename T> @@ -378,6 +380,8 @@ private: std::shared_ptr<doris::pipeline::BlockedTaskScheduler> _with_group_block_scheduler; doris::pipeline::RuntimeFilterTimerQueue* _runtime_filter_timer_queue = nullptr; + + WorkloadSchedPolicyMgr* _workload_sched_mgr = nullptr; }; template <> diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 4df69d67c03..48d229b9e21 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -76,6 +76,7 @@ #include "runtime/task_group/task_group_manager.h" #include "runtime/thread_context.h" #include "runtime/user_function_cache.h" +#include "runtime/workload_management/workload_sched_policy_mgr.h" #include "service/backend_options.h" #include "service/backend_service.h" #include "service/point_query_executor.h" @@ -194,6 +195,10 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths, .set_max_threads(1) .set_max_queue_size(1000000) .build(&_lazy_release_obj_pool)); + + _workload_sched_mgr = new WorkloadSchedPolicyMgr(); + _workload_sched_mgr->start(this); + init_file_cache_factory(); RETURN_IF_ERROR(init_pipeline_task_scheduler()); _task_group_manager = new taskgroup::TaskGroupManager(); @@ -519,6 +524,8 @@ void ExecEnv::destroy() { SAFE_STOP(_group_commit_mgr); // _routine_load_task_executor should be stopped before _new_load_stream_mgr. SAFE_STOP(_routine_load_task_executor); + // stop workload scheduler + SAFE_STOP(_workload_sched_mgr); // stop pipline step 1, non-cgroup execution SAFE_SHUTDOWN(_without_group_block_scheduler.get()); SAFE_STOP(_without_group_task_scheduler); @@ -593,6 +600,7 @@ void ExecEnv::destroy() { SAFE_DELETE(_bfd_parser); SAFE_DELETE(_result_cache); SAFE_DELETE(_fragment_mgr); + SAFE_DELETE(_workload_sched_mgr); SAFE_DELETE(_task_group_manager); SAFE_DELETE(_with_group_task_scheduler); SAFE_DELETE(_without_group_task_scheduler); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 65e1ba475ae..9256c3bccfd 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -73,6 +73,7 @@ #include "runtime/task_group/task_group_manager.h" #include "runtime/thread_context.h" #include "runtime/types.h" +#include "runtime/workload_management/workload_query_info.h" #include "service/backend_options.h" #include "util/debug_util.h" #include "util/doris_metrics.h" @@ -1567,4 +1568,23 @@ void FragmentMgr::_setup_shared_hashtable_for_broadcast_join(const TPipelineFrag } } +void FragmentMgr::get_runtime_query_info(std::vector<WorkloadQueryInfo>* query_info_list) { + { + std::lock_guard<std::mutex> lock(_lock); + // todo: use monotonic time + VecDateTimeValue now = VecDateTimeValue::local_time(); + for (const auto& q : _query_ctx_map) { + WorkloadQueryInfo workload_query_info; + workload_query_info.query_id = print_id(q.first); + workload_query_info.tquery_id = q.first; + + uint64_t query_time_millisecond = q.second->query_time(now) * 1000; + workload_query_info.metric_map.emplace(WorkloadMetricType::QUERY_TIME, + std::to_string(query_time_millisecond)); + // todo, add scan rows, scan bytes + query_info_list->push_back(workload_query_info); + } + } +} + } // namespace doris diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index 21d85503803..f9a6d4f72ed 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -64,6 +64,7 @@ class TPipelineInstanceParams; class TScanColumnDesc; class TScanOpenParams; class Thread; +class WorkloadQueryInfo; std::string to_load_error_http_path(const std::string& file_name); @@ -153,6 +154,8 @@ public: std::string dump_pipeline_tasks(); + void get_runtime_query_info(std::vector<WorkloadQueryInfo>* _query_info_list); + private: void cancel_unlocked_impl(const TUniqueId& id, const PPlanFragmentCancelReason& reason, const std::unique_lock<std::mutex>& state_lock, bool is_pipeline, diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index a230fd653e8..203c5b6e3f4 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -93,6 +93,8 @@ public: return false; } + int64_t query_time(VecDateTimeValue& now) { return now.second_diff(_start_time); } + void set_thread_token(int concurrency, bool is_serial) { _thread_token = _exec_env->scanner_scheduler()->new_limited_scan_pool_token( is_serial ? ThreadPool::ExecutionMode::SERIAL diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java b/be/src/runtime/workload_management/workload_action.cpp similarity index 55% copy from fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java copy to be/src/runtime/workload_management/workload_action.cpp index 6c5ce9e4c11..39916bc7cc1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java +++ b/be/src/runtime/workload_management/workload_action.cpp @@ -15,23 +15,21 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.common.publish; +#include "runtime/workload_management/workload_action.h" -import org.apache.doris.catalog.Env; -import org.apache.doris.thrift.TPublishTopicRequest; -import org.apache.doris.thrift.TTopicInfoType; +#include "runtime/fragment_mgr.h" -public class WorkloadGroupPublisher implements TopicPublisher { +namespace doris { - private Env env; +void WorkloadActionCancelQuery::exec(WorkloadQueryInfo* query_info) { + LOG(INFO) << "[workload_schedule]workload scheduler cancel query " << query_info->query_id; + ExecEnv::GetInstance()->fragment_mgr()->cancel_query( + query_info->tquery_id, PPlanFragmentCancelReason::INTERNAL_ERROR, + std::string("query canceled by workload scheduler")); +} - public WorkloadGroupPublisher(Env env) { - this.env = env; - } +void WorkloadActionMoveQuery::exec(WorkloadQueryInfo* query_info) { + LOG(INFO) << "[workload_schedule]move query action run group=" << _wg_name; +}; - @Override - public void getTopicInfo(TPublishTopicRequest req) { - req.putToTopicMap(TTopicInfoType.WORKLOAD_GROUP, - env.getWorkloadGroupMgr().getPublishTopicInfo()); - } -} +} // namespace doris \ No newline at end of file diff --git a/be/src/runtime/workload_management/workload_action.h b/be/src/runtime/workload_management/workload_action.h new file mode 100644 index 00000000000..29c01320b78 --- /dev/null +++ b/be/src/runtime/workload_management/workload_action.h @@ -0,0 +1,68 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "runtime/workload_management/workload_query_info.h" + +namespace doris { + +enum WorkloadActionType { MOVE_QUERY_TO_GROUP = 0, CANCEL_QUERY = 1 }; + +class WorkloadAction { +public: + WorkloadAction() = default; + virtual ~WorkloadAction() = default; + + virtual void exec(WorkloadQueryInfo* query_info) = 0; + + virtual WorkloadActionType get_action_type() = 0; +}; + +class WorkloadActionCancelQuery : public WorkloadAction { +public: + void exec(WorkloadQueryInfo* query_info) override; + + WorkloadActionType get_action_type() override { return CANCEL_QUERY; } +}; + +//todo(wb) implement it +class WorkloadActionMoveQuery : public WorkloadAction { +public: + WorkloadActionMoveQuery(std::string wg_name) : _wg_name(wg_name) {} + void exec(WorkloadQueryInfo* query_info) override; + + WorkloadActionType get_action_type() override { return MOVE_QUERY_TO_GROUP; } + +private: + std::string _wg_name; +}; + +class WorkloadActionFactory { +public: + static std::unique_ptr<WorkloadAction> create_workload_action(TWorkloadAction* action) { + if (TWorkloadActionType::type::MOVE_QUERY_TO_GROUP == action->action) { + return std::make_unique<WorkloadActionMoveQuery>(action->action_args); + } else if (TWorkloadActionType::type::CANCEL_QUERY == action->action) { + return std::make_unique<WorkloadActionCancelQuery>(); + } + LOG(ERROR) << "not find a action " << action->action; + return nullptr; + } +}; + +} // namespace doris \ No newline at end of file diff --git a/be/src/runtime/workload_management/workload_comparator.h b/be/src/runtime/workload_management/workload_comparator.h new file mode 100644 index 00000000000..00981a89dc8 --- /dev/null +++ b/be/src/runtime/workload_management/workload_comparator.h @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <string> + +#include "common/status.h" + +namespace doris { + +enum WorkloadCompareOperator { EQUAL, GREATER, GREATER_EQUAL, LESS, LESS_EQUAL, INVALID_OP }; + +class WorkloadCompareUtils { +public: + static WorkloadCompareOperator get_workload_compare_operator(TCompareOperator::type t_op) { + if (TCompareOperator::type::EQUAL == t_op) { + return EQUAL; + } else if (TCompareOperator::type::GREATER == t_op) { + return GREATER; + } else if (TCompareOperator::type::GREATER_EQUAL == t_op) { + return GREATER_EQUAL; + } else if (TCompareOperator::type::LESS == t_op) { + return LESS; + } else if (TCompareOperator::type::LESS_EQUAL == t_op) { + return LESS_EQUAL; + } + LOG(ERROR) << "can not find a valid op "; + return INVALID_OP; + } + + static bool compare_signed_integer(WorkloadCompareOperator op, int64_t first_val, + int64_t second_val) { + switch (op) { + case EQUAL: + return first_val == second_val; + case GREATER: + return first_val > second_val; + case GREATER_EQUAL: + return first_val >= second_val; + case LESS: + return first_val < second_val; + case LESS_EQUAL: + return first_val <= second_val; + default: + LOG(ERROR) << "unexpected signed integer compare operator " << op; + return false; + } + } + + static bool compare_string(WorkloadCompareOperator op, std::string first_val, + std::string second_val) { + switch (op) { + case EQUAL: + return first_val == second_val; + default: + LOG(ERROR) << "unexpected string compare operator " << op; + return false; + } + } +}; + +} // namespace doris \ No newline at end of file diff --git a/be/src/runtime/workload_management/workload_condition.cpp b/be/src/runtime/workload_management/workload_condition.cpp new file mode 100644 index 00000000000..dff6f2adc24 --- /dev/null +++ b/be/src/runtime/workload_management/workload_condition.cpp @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/workload_management/workload_condition.h" + +namespace doris { + +// query time +WorkloadConditionQueryTime::WorkloadConditionQueryTime(WorkloadCompareOperator op, + std::string str_val) { + _op = op; + _query_time = std::stol(str_val); +} + +bool WorkloadConditionQueryTime::eval(std::string str_val) { + int64_t query_time_args = std::stol(str_val); + return WorkloadCompareUtils::compare_signed_integer(_op, query_time_args, _query_time); +} + +// scan rows +WorkloadConditionScanRows::WorkloadConditionScanRows(WorkloadCompareOperator op, + std::string str_val) { + _op = op; + _scan_rows = std::stol(str_val); +} + +bool WorkloadConditionScanRows::eval(std::string str_val) { + int64_t scan_rows_args = std::stol(str_val); + return WorkloadCompareUtils::compare_signed_integer(_op, scan_rows_args, _scan_rows); +} + +// scan bytes +WorkloadConditionScanBytes::WorkloadConditionScanBytes(WorkloadCompareOperator op, + std::string str_val) { + _op = op; + _scan_bytes = std::stol(str_val); +} + +// todo(wb): need handle invalid input value +bool WorkloadConditionScanBytes::eval(std::string str_val) { + int64_t scan_bytes_args = std::stol(str_val); + return WorkloadCompareUtils::compare_signed_integer(_op, scan_bytes_args, _scan_bytes); +} + +} // namespace doris \ No newline at end of file diff --git a/be/src/runtime/workload_management/workload_condition.h b/be/src/runtime/workload_management/workload_condition.h new file mode 100644 index 00000000000..3486742c968 --- /dev/null +++ b/be/src/runtime/workload_management/workload_condition.h @@ -0,0 +1,97 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <gen_cpp/BackendService_types.h> + +#include "runtime/workload_management/workload_comparator.h" + +namespace doris { + +enum WorkloadMetricType { QUERY_TIME, SCAN_ROWS, SCAN_BYTES }; + +class WorkloadCondition { +public: + WorkloadCondition() = default; + virtual ~WorkloadCondition() = default; + + virtual bool eval(std::string str_val) = 0; + + virtual WorkloadMetricType get_workload_metric_type() = 0; +}; + +class WorkloadConditionQueryTime : public WorkloadCondition { +public: + WorkloadConditionQueryTime(WorkloadCompareOperator op, std::string str_val); + + bool eval(std::string str_val) override; + + WorkloadMetricType get_workload_metric_type() override { + return WorkloadMetricType::QUERY_TIME; + } + +private: + int64_t _query_time; + WorkloadCompareOperator _op; +}; + +class WorkloadConditionScanRows : public WorkloadCondition { +public: + WorkloadConditionScanRows(WorkloadCompareOperator op, std::string str_val); + bool eval(std::string str_val) override; + WorkloadMetricType get_workload_metric_type() override { return WorkloadMetricType::SCAN_ROWS; } + +private: + int64_t _scan_rows; + WorkloadCompareOperator _op; +}; + +class WorkloadConditionScanBytes : public WorkloadCondition { +public: + WorkloadConditionScanBytes(WorkloadCompareOperator op, std::string str_val); + bool eval(std::string str_val) override; + WorkloadMetricType get_workload_metric_type() override { + return WorkloadMetricType::SCAN_BYTES; + } + +private: + int64_t _scan_bytes; + WorkloadCompareOperator _op; +}; + +class WorkloadConditionFactory { +public: + static std::unique_ptr<WorkloadCondition> create_workload_condition( + TWorkloadCondition* t_cond) { + WorkloadCompareOperator op = + WorkloadCompareUtils::get_workload_compare_operator(t_cond->op); + std::string str_val = t_cond->value; + TWorkloadMetricType::type metric_name = t_cond->metric_name; + if (TWorkloadMetricType::type::QUERY_TIME == metric_name) { + return std::make_unique<WorkloadConditionQueryTime>(op, str_val); + } else if (TWorkloadMetricType::type::SCAN_ROWS == metric_name) { + return std::make_unique<WorkloadConditionScanRows>(op, str_val); + } else if (TWorkloadMetricType::type::SCAN_BYTES == metric_name) { + return std::make_unique<WorkloadConditionScanBytes>(op, str_val); + } + LOG(ERROR) << "not find a metric name " << metric_name; + return nullptr; + } +}; + +} // namespace doris \ No newline at end of file diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java b/be/src/runtime/workload_management/workload_query_info.h similarity index 59% copy from fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java copy to be/src/runtime/workload_management/workload_query_info.h index 6c5ce9e4c11..9c24e9dee8a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java +++ b/be/src/runtime/workload_management/workload_query_info.h @@ -15,23 +15,19 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.common.publish; +#pragma once -import org.apache.doris.catalog.Env; -import org.apache.doris.thrift.TPublishTopicRequest; -import org.apache.doris.thrift.TTopicInfoType; +#include <map> -public class WorkloadGroupPublisher implements TopicPublisher { +#include "runtime/workload_management/workload_condition.h" - private Env env; +namespace doris { - public WorkloadGroupPublisher(Env env) { - this.env = env; - } +class WorkloadQueryInfo { +public: + std::map<WorkloadMetricType, std::string> metric_map; + TUniqueId tquery_id; + std::string query_id; +}; - @Override - public void getTopicInfo(TPublishTopicRequest req) { - req.putToTopicMap(TTopicInfoType.WORKLOAD_GROUP, - env.getWorkloadGroupMgr().getPublishTopicInfo()); - } -} +} // namespace doris \ No newline at end of file diff --git a/be/src/runtime/workload_management/workload_sched_policy.cpp b/be/src/runtime/workload_management/workload_sched_policy.cpp new file mode 100644 index 00000000000..bc543a7d770 --- /dev/null +++ b/be/src/runtime/workload_management/workload_sched_policy.cpp @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/workload_management/workload_sched_policy.h" + +namespace doris { + +void WorkloadSchedPolicy::init(int64_t id, std::string name, int version, bool enabled, + int priority, + std::vector<std::unique_ptr<WorkloadCondition>> condition_list, + std::vector<std::unique_ptr<WorkloadAction>> action_list) { + _id = id; + _name = name; + _version = version; + _enabled = enabled; + _priority = priority; + _condition_list = std::move(condition_list); + _action_list = std::move(action_list); + + _first_action_type = _action_list[0]->get_action_type(); + if (_first_action_type != WorkloadActionType::MOVE_QUERY_TO_GROUP && + _first_action_type != WorkloadActionType::CANCEL_QUERY) { + for (int i = 1; i < _action_list.size(); i++) { + WorkloadActionType cur_action_type = _action_list[i]->get_action_type(); + // one policy can not both contains move and cancel + if (cur_action_type == WorkloadActionType::MOVE_QUERY_TO_GROUP || + cur_action_type == WorkloadActionType::CANCEL_QUERY) { + _first_action_type = cur_action_type; + break; + } + } + } +} + +bool WorkloadSchedPolicy::is_match(WorkloadQueryInfo* query_info_ptr) { + if (!_enabled) { + return false; + } + auto& metric_val_map = query_info_ptr->metric_map; + for (auto& cond : _condition_list) { + if (metric_val_map.find(cond->get_workload_metric_type()) == metric_val_map.end()) { + return false; + } + + std::string val = metric_val_map.at(cond->get_workload_metric_type()); + if (!cond->eval(val)) { + return false; + } + } + return true; +} + +void WorkloadSchedPolicy::exec_action(WorkloadQueryInfo* query_info) { + for (int i = 0; i < _action_list.size(); i++) { + _action_list[i]->exec(query_info); + } +} + +} // namespace doris \ No newline at end of file diff --git a/be/src/runtime/workload_management/workload_sched_policy.h b/be/src/runtime/workload_management/workload_sched_policy.h new file mode 100644 index 00000000000..82f42b9a0b4 --- /dev/null +++ b/be/src/runtime/workload_management/workload_sched_policy.h @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <vector> + +#include "runtime/workload_management/workload_action.h" +#include "runtime/workload_management/workload_condition.h" + +namespace doris { + +class WorkloadSchedPolicy { +public: + WorkloadSchedPolicy() = default; + ~WorkloadSchedPolicy() = default; + + void init(int64_t id, std::string name, int version, bool enabled, int priority, + std::vector<std::unique_ptr<WorkloadCondition>> condition_list, + std::vector<std::unique_ptr<WorkloadAction>> action_list); + + bool enabled() { return _enabled; } + int priority() { return _priority; } + + bool is_match(WorkloadQueryInfo* query_info); + + WorkloadActionType get_first_action_type() { return _first_action_type; } + + void exec_action(WorkloadQueryInfo* query_info); + + int version() { return _version; } + +private: + int64_t _id; + std::string _name; + int _version; + bool _enabled; + int _priority; + + std::vector<std::unique_ptr<WorkloadCondition>> _condition_list; + std::vector<std::unique_ptr<WorkloadAction>> _action_list; + + WorkloadActionType _first_action_type; +}; +} // namespace doris \ No newline at end of file diff --git a/be/src/runtime/workload_management/workload_sched_policy_mgr.cpp b/be/src/runtime/workload_management/workload_sched_policy_mgr.cpp new file mode 100644 index 00000000000..8a30b5395eb --- /dev/null +++ b/be/src/runtime/workload_management/workload_sched_policy_mgr.cpp @@ -0,0 +1,131 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/workload_management/workload_sched_policy_mgr.h" + +#include "runtime/fragment_mgr.h" + +namespace doris { + +void WorkloadSchedPolicyMgr::start(ExecEnv* exec_env) { + _stop_latch.reset(1); + _exec_env = exec_env; + + Status st; + st = Thread::create( + "workload", "workload_scheduler", [this]() { this->_schedule_workload(); }, &_thread); + if (!st.ok()) { + LOG(WARNING) << "create workload scheduler thread failed"; + } else { + LOG(INFO) << "start workload scheduler "; + } +} + +void WorkloadSchedPolicyMgr::stop() { + std::lock_guard<std::shared_mutex> write_lock(_stop_lock); + if (_stop_latch.count() == 0) { + LOG(INFO) << "workload schedule manager is already stopped. "; + return; + } + _stop_latch.count_down(); + if (_thread) { + _thread->join(); + } + LOG(INFO) << "workload schedule manager stopped, thread is finished. "; +} + +void WorkloadSchedPolicyMgr::update_workload_sched_policy( + std::map<uint64_t, std::shared_ptr<WorkloadSchedPolicy>> new_policy_map) { + std::lock_guard<std::shared_mutex> write_lock(_policy_lock); + // 1 upsert + for (const auto& [id, policy] : new_policy_map) { + if (_id_policy_map.find(id) == _id_policy_map.end()) { + _id_policy_map.emplace(id, policy); + } else { + if (policy->version() > _id_policy_map.at(id)->version()) { + _id_policy_map[id] = policy; + } + } + } + + // 2 delete + for (auto iter = _id_policy_map.begin(); iter != _id_policy_map.end();) { + if (new_policy_map.find(iter->first) == new_policy_map.end()) { + iter = _id_policy_map.erase(iter); + } else { + iter++; + } + } +} + +void WorkloadSchedPolicyMgr::_schedule_workload() { + while (!_stop_latch.wait_for(std::chrono::milliseconds(500))) { + // 1 get query info + std::vector<WorkloadQueryInfo> list; + _exec_env->fragment_mgr()->get_runtime_query_info(&list); + // todo: add timer + LOG(INFO) << "[workload_schedule] get query list size=" << list.size(); + + for (int i = 0; i < list.size(); i++) { + WorkloadQueryInfo* query_info_ptr = &(list[i]); + // 2 get matched policy + std::map<WorkloadActionType, std::shared_ptr<WorkloadSchedPolicy>> matched_policy_map; + { + std::shared_lock<std::shared_mutex> read_lock(_policy_lock); + for (auto& entity : _id_policy_map) { + auto& new_policy = entity.second; + if (new_policy->is_match(query_info_ptr)) { + WorkloadActionType new_policy_type = new_policy->get_first_action_type(); + if (matched_policy_map.find(new_policy_type) == matched_policy_map.end() || + new_policy->priority() > + matched_policy_map.at(new_policy_type)->priority()) { + matched_policy_map[new_policy_type] = new_policy; + } + } + } + } + + if (matched_policy_map.size() == 0) { + continue; + } + LOG(INFO) << "[workload_schedule] matched policy size=" << matched_policy_map.size(); + // 3 check action conflicts + if (matched_policy_map.find(WorkloadActionType::MOVE_QUERY_TO_GROUP) != + matched_policy_map.end() && + matched_policy_map.find(WorkloadActionType::CANCEL_QUERY) != + matched_policy_map.end()) { + // compare priority + int move_prio = + matched_policy_map.at(WorkloadActionType::MOVE_QUERY_TO_GROUP)->priority(); + int cancel_prio = + matched_policy_map.at(WorkloadActionType::CANCEL_QUERY)->priority(); + if (cancel_prio >= move_prio) { + matched_policy_map.erase(WorkloadActionType::MOVE_QUERY_TO_GROUP); + } else { + matched_policy_map.erase(WorkloadActionType::CANCEL_QUERY); + } + } + + // 4 exec policy action + for (const auto& [key, value] : matched_policy_map) { + value->exec_action(query_info_ptr); + } + } + } +} + +} // namespace doris \ No newline at end of file diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java b/be/src/runtime/workload_management/workload_sched_policy_mgr.h similarity index 51% copy from fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java copy to be/src/runtime/workload_management/workload_sched_policy_mgr.h index 6c5ce9e4c11..52b41eacf4b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java +++ b/be/src/runtime/workload_management/workload_sched_policy_mgr.h @@ -15,23 +15,36 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.common.publish; +#pragma once -import org.apache.doris.catalog.Env; -import org.apache.doris.thrift.TPublishTopicRequest; -import org.apache.doris.thrift.TTopicInfoType; +#include "runtime/exec_env.h" +#include "runtime/workload_management/workload_sched_policy.h" +#include "util/countdown_latch.h" -public class WorkloadGroupPublisher implements TopicPublisher { +namespace doris { - private Env env; +class WorkloadSchedPolicyMgr { +public: + WorkloadSchedPolicyMgr() : _stop_latch(0) {} + ~WorkloadSchedPolicyMgr() = default; - public WorkloadGroupPublisher(Env env) { - this.env = env; - } + void start(ExecEnv* exec_env); - @Override - public void getTopicInfo(TPublishTopicRequest req) { - req.putToTopicMap(TTopicInfoType.WORKLOAD_GROUP, - env.getWorkloadGroupMgr().getPublishTopicInfo()); - } -} + void stop(); + + void update_workload_sched_policy( + std::map<uint64_t, std::shared_ptr<WorkloadSchedPolicy>> policy_map); + +private: + void _schedule_workload(); + + std::shared_mutex _policy_lock; + std::map<uint64_t, std::shared_ptr<WorkloadSchedPolicy>> _id_policy_map; + + std::shared_mutex _stop_lock; + CountDownLatch _stop_latch; + scoped_refptr<Thread> _thread; + ExecEnv* _exec_env; +}; + +}; // namespace doris \ No newline at end of file diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index cd77f70f59c..1ad131abb0a 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -230,6 +230,7 @@ import org.apache.doris.qe.VariableMgr; import org.apache.doris.resource.Tag; import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr; import org.apache.doris.resource.workloadschedpolicy.WorkloadSchedPolicyMgr; +import org.apache.doris.resource.workloadschedpolicy.WorkloadSchedPolicyPublisher; import org.apache.doris.scheduler.manager.TransientTaskManager; import org.apache.doris.scheduler.registry.ExportTaskRegister; import org.apache.doris.service.ExecuteEnv; @@ -998,6 +999,8 @@ public class Env { TopicPublisher wgPublisher = new WorkloadGroupPublisher(this); topicPublisherThread.addToTopicPublisherList(wgPublisher); + WorkloadSchedPolicyPublisher wpPublisher = new WorkloadSchedPolicyPublisher(this); + topicPublisherThread.addToTopicPublisherList(wpPublisher); topicPublisherThread.start(); workloadGroupMgr.startUpdateThread(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisherThread.java b/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisherThread.java index 616c8a30b56..db5158e24f5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisherThread.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisherThread.java @@ -59,9 +59,6 @@ public class TopicPublisherThread extends MasterDaemon { @Override protected void runAfterCatalogReady() { - if (!Config.enable_workload_group) { - return; - } LOG.info("begin publish topic info"); // step 1: get all publish topic info TPublishTopicRequest request = new TPublishTopicRequest(); @@ -69,6 +66,10 @@ public class TopicPublisherThread extends MasterDaemon { topicPublisher.getTopicInfo(request); } + if (request.getTopicMap().size() == 0) { + return; + } + // step 2: publish topic info to all be Collection<Backend> nodesToPublish = clusterInfoService.getIdToBackend().values(); AckResponseHandler handler = new AckResponseHandler(nodesToPublish); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java b/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java index 6c5ce9e4c11..e81ccb06d3e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java @@ -20,6 +20,9 @@ package org.apache.doris.common.publish; import org.apache.doris.catalog.Env; import org.apache.doris.thrift.TPublishTopicRequest; import org.apache.doris.thrift.TTopicInfoType; +import org.apache.doris.thrift.TopicInfo; + +import java.util.List; public class WorkloadGroupPublisher implements TopicPublisher { @@ -31,7 +34,9 @@ public class WorkloadGroupPublisher implements TopicPublisher { @Override public void getTopicInfo(TPublishTopicRequest req) { - req.putToTopicMap(TTopicInfoType.WORKLOAD_GROUP, - env.getWorkloadGroupMgr().getPublishTopicInfo()); + List<TopicInfo> list = env.getWorkloadGroupMgr().getPublishTopicInfo(); + if (list.size() > 0) { + req.putToTopicMap(TTopicInfoType.WORKLOAD_GROUP, list); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java index 827c2367133..d514ea62d2d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java @@ -22,8 +22,16 @@ import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.persist.gson.GsonPostProcessable; import org.apache.doris.persist.gson.GsonUtils; +import org.apache.doris.thrift.TCompareOperator; +import org.apache.doris.thrift.TWorkloadAction; +import org.apache.doris.thrift.TWorkloadActionType; +import org.apache.doris.thrift.TWorkloadCondition; +import org.apache.doris.thrift.TWorkloadMetricType; +import org.apache.doris.thrift.TWorkloadSchedPolicy; +import org.apache.doris.thrift.TopicInfo; import com.esotericsoftware.minlog.Log; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.gson.annotations.SerializedName; @@ -41,6 +49,23 @@ public class WorkloadSchedPolicy implements Writable, GsonPostProcessable { public static final ImmutableSet<String> POLICY_PROPERTIES = new ImmutableSet.Builder<String>() .add(ENABLED).add(PRIORITY).build(); + // used for convert fe type to thrift type + private static ImmutableMap<WorkloadMetricType, TWorkloadMetricType> METRIC_MAP + = new ImmutableMap.Builder<WorkloadMetricType, TWorkloadMetricType>() + .put(WorkloadMetricType.QUERY_TIME, TWorkloadMetricType.QUERY_TIME).build(); + private static ImmutableMap<WorkloadActionType, TWorkloadActionType> ACTION_MAP + = new ImmutableMap.Builder<WorkloadActionType, TWorkloadActionType>() + .put(WorkloadActionType.MOVE_QUERY_TO_GROUP, TWorkloadActionType.MOVE_QUERY_TO_GROUP) + .put(WorkloadActionType.CANCEL_QUERY, TWorkloadActionType.CANCEL_QUERY).build(); + + private static ImmutableMap<WorkloadConditionOperator, TCompareOperator> OP_MAP + = new ImmutableMap.Builder<WorkloadConditionOperator, TCompareOperator>() + .put(WorkloadConditionOperator.EQUAL, TCompareOperator.EQUAL) + .put(WorkloadConditionOperator.GREATER, TCompareOperator.GREATER) + .put(WorkloadConditionOperator.GREATER_EQUAL, TCompareOperator.GREATER_EQUAL) + .put(WorkloadConditionOperator.LESS, TCompareOperator.LESS) + .put(WorkloadConditionOperator.LESS_EQUAl, TCompareOperator.LESS_EQUAL).build(); + @SerializedName(value = "id") long id; @SerializedName(value = "name") @@ -173,6 +198,48 @@ public class WorkloadSchedPolicy implements Writable, GsonPostProcessable { return actionMetaList; } + public TopicInfo toTopicInfo() { + TWorkloadSchedPolicy tPolicy = new TWorkloadSchedPolicy(); + tPolicy.setId(id); + tPolicy.setName(name); + tPolicy.setVersion(version); + tPolicy.setPriority(priority); + tPolicy.setEnabled(enabled); + + List<TWorkloadCondition> condList = new ArrayList(); + for (WorkloadConditionMeta cond : conditionMetaList) { + TWorkloadCondition tCond = new TWorkloadCondition(); + TWorkloadMetricType metricType = METRIC_MAP.get(cond.metricName); + if (metricType == null) { + return null; + } + tCond.setMetricName(metricType); + tCond.setOp(OP_MAP.get(cond.op)); + tCond.setValue(cond.value); + condList.add(tCond); + } + + List<TWorkloadAction> actionList = new ArrayList(); + for (WorkloadActionMeta action : actionMetaList) { + TWorkloadAction tAction = new TWorkloadAction(); + TWorkloadActionType tActionType = ACTION_MAP.get(action.action); + if (tActionType == null) { + return null; + } + tAction.setAction(tActionType); + tAction.setActionArgs(action.actionArgs); + actionList.add(tAction); + } + + tPolicy.setConditionList(condList); + tPolicy.setActionList(actionList); + + TopicInfo topicInfo = new TopicInfo(); + topicInfo.setWorkloadSchedPolicy(tPolicy); + + return topicInfo; + } + @Override public void write(DataOutput out) throws IOException { String json = GsonUtils.GSON.toJson(this); diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java index 346e34796c7..45ba3a35de4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java @@ -35,6 +35,7 @@ import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.qe.ConnectContext; import org.apache.doris.service.ExecuteEnv; import org.apache.doris.thrift.TUserIdentity; +import org.apache.doris.thrift.TopicInfo; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -412,6 +413,22 @@ public class WorkloadSchedPolicyMgr implements Writable, GsonPostProcessable { lock.writeLock().unlock(); } + public List<TopicInfo> getPublishTopicInfoList() { + List<TopicInfo> topicInfoList = new ArrayList(); + readLock(); + try { + for (Map.Entry<Long, WorkloadSchedPolicy> entry : idToPolicy.entrySet()) { + TopicInfo tInfo = entry.getValue().toTopicInfo(); + if (tInfo != null) { + topicInfoList.add(tInfo); + } + } + } finally { + readUnlock(); + } + return topicInfoList; + } + public void replayCreateWorkloadSchedPolicy(WorkloadSchedPolicy policy) { insertWorkloadSchedPolicy(policy); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyPublisher.java similarity index 68% copy from fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java copy to fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyPublisher.java index 6c5ce9e4c11..5083d183e6c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyPublisher.java @@ -15,23 +15,30 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.common.publish; +package org.apache.doris.resource.workloadschedpolicy; import org.apache.doris.catalog.Env; +import org.apache.doris.common.publish.TopicPublisher; import org.apache.doris.thrift.TPublishTopicRequest; import org.apache.doris.thrift.TTopicInfoType; +import org.apache.doris.thrift.TopicInfo; -public class WorkloadGroupPublisher implements TopicPublisher { +import java.util.List; + +public class WorkloadSchedPolicyPublisher implements TopicPublisher { private Env env; - public WorkloadGroupPublisher(Env env) { + public WorkloadSchedPolicyPublisher(Env env) { this.env = env; } @Override public void getTopicInfo(TPublishTopicRequest req) { - req.putToTopicMap(TTopicInfoType.WORKLOAD_GROUP, - env.getWorkloadGroupMgr().getPublishTopicInfo()); + List<TopicInfo> list = env.getWorkloadSchedPolicyMgr().getPublishTopicInfoList(); + if (list.size() > 0) { + req.putToTopicMap(TTopicInfoType.WORKLOAD_SCHED_POLICY, list); + } } + } diff --git a/gensrc/thrift/BackendService.thrift b/gensrc/thrift/BackendService.thrift index 855b7489711..2c5b199fc17 100644 --- a/gensrc/thrift/BackendService.thrift +++ b/gensrc/thrift/BackendService.thrift @@ -163,6 +163,7 @@ struct TQueryIngestBinlogResult { enum TTopicInfoType { WORKLOAD_GROUP MOVE_QUERY_TO_GROUP + WORKLOAD_SCHED_POLICY } struct TWorkloadGroupInfo { @@ -178,12 +179,53 @@ struct TWorkloadGroupInfo { struct TWorkloadMoveQueryToGroupAction { 1: optional Types.TUniqueId query_id - 2: optional i64 workload_group_id; + 2: optional i64 workload_group_id +} + +enum TWorkloadMetricType { + QUERY_TIME + SCAN_ROWS + SCAN_BYTES +} + +enum TCompareOperator { + EQUAL + GREATER + GREATER_EQUAL + LESS + LESS_EQUAL +} + +struct TWorkloadCondition { + 1: optional TWorkloadMetricType metric_name + 2: optional TCompareOperator op + 3: optional string value +} + +enum TWorkloadActionType { + MOVE_QUERY_TO_GROUP + CANCEL_QUERY +} + +struct TWorkloadAction { + 1: optional TWorkloadActionType action + 2: optional string action_args +} + +struct TWorkloadSchedPolicy { + 1: optional i64 id + 2: optional string name + 3: optional i32 version + 4: optional i32 priority + 5: optional bool enabled + 6: optional list<TWorkloadCondition> condition_list + 7: optional list<TWorkloadAction> action_list } struct TopicInfo { 1: optional TWorkloadGroupInfo workload_group_info 2: optional TWorkloadMoveQueryToGroupAction move_action + 3: optional TWorkloadSchedPolicy workload_sched_policy } struct TPublishTopicRequest { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org