This is an automated email from the ASF dual-hosted git repository.

wangbo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 31b3be456c5 add workload scheduler in be (#29116)
31b3be456c5 is described below

commit 31b3be456c53c8c0ec7bb64ecca42866b71e86e0
Author: wangbo <wan...@apache.org>
AuthorDate: Thu Dec 28 15:04:22 2023 +0800

    add workload scheduler in be (#29116)
---
 be/src/agent/agent_server.cpp                      |   8 ++
 be/src/agent/workload_sched_policy_listener.cpp    |  78 ++++++++++++
 .../src/agent/workload_sched_policy_listener.h     |  30 ++---
 be/src/runtime/exec_env.h                          |   4 +
 be/src/runtime/exec_env_init.cpp                   |   8 ++
 be/src/runtime/fragment_mgr.cpp                    |  20 ++++
 be/src/runtime/fragment_mgr.h                      |   3 +
 be/src/runtime/query_context.h                     |   2 +
 .../workload_management/workload_action.cpp        |  28 ++---
 .../runtime/workload_management/workload_action.h  |  68 +++++++++++
 .../workload_management/workload_comparator.h      |  77 ++++++++++++
 .../workload_management/workload_condition.cpp     |  59 ++++++++++
 .../workload_management/workload_condition.h       |  97 +++++++++++++++
 .../workload_management/workload_query_info.h      |  26 ++--
 .../workload_management/workload_sched_policy.cpp  |  73 ++++++++++++
 .../workload_management/workload_sched_policy.h    |  59 ++++++++++
 .../workload_sched_policy_mgr.cpp                  | 131 +++++++++++++++++++++
 .../workload_sched_policy_mgr.h                    |  43 ++++---
 .../main/java/org/apache/doris/catalog/Env.java    |   3 +
 .../doris/common/publish/TopicPublisherThread.java |   7 +-
 .../common/publish/WorkloadGroupPublisher.java     |   9 +-
 .../workloadschedpolicy/WorkloadSchedPolicy.java   |  67 +++++++++++
 .../WorkloadSchedPolicyMgr.java                    |  17 +++
 .../WorkloadSchedPolicyPublisher.java}             |  17 ++-
 gensrc/thrift/BackendService.thrift                |  44 ++++++-
 25 files changed, 907 insertions(+), 71 deletions(-)

diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp
index 9902a0ad726..a3b18c53567 100644
--- a/be/src/agent/agent_server.cpp
+++ b/be/src/agent/agent_server.cpp
@@ -31,6 +31,7 @@
 #include "agent/topic_subscriber.h"
 #include "agent/utils.h"
 #include "agent/workload_group_listener.h"
+#include "agent/workload_sched_policy_listener.h"
 #include "common/config.h"
 #include "common/logging.h"
 #include "common/status.h"
@@ -72,6 +73,13 @@ AgentServer::AgentServer(ExecEnv* exec_env, const 
TMasterInfo& master_info)
     LOG(INFO) << "Register workload group listener";
     
_topic_subscriber->register_listener(doris::TTopicInfoType::type::WORKLOAD_GROUP,
                                          std::move(wg_listener));
+
+    std::unique_ptr<TopicListener> policy_listener =
+            std::make_unique<WorkloadschedPolicyListener>(exec_env);
+    LOG(INFO) << "Register workload scheduler policy listener";
+    
_topic_subscriber->register_listener(doris::TTopicInfoType::type::WORKLOAD_SCHED_POLICY,
+                                         std::move(policy_listener));
+
 #endif
 }
 
diff --git a/be/src/agent/workload_sched_policy_listener.cpp 
b/be/src/agent/workload_sched_policy_listener.cpp
new file mode 100644
index 00000000000..461fd2cbb0f
--- /dev/null
+++ b/be/src/agent/workload_sched_policy_listener.cpp
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "agent/workload_sched_policy_listener.h"
+
+#include "runtime/workload_management/workload_action.h"
+#include "runtime/workload_management/workload_condition.h"
+#include "runtime/workload_management/workload_sched_policy.h"
+#include "runtime/workload_management/workload_sched_policy_mgr.h"
+
+namespace doris {
+
+void WorkloadschedPolicyListener::handle_topic_info(const 
std::vector<TopicInfo>& topic_info_list) {
+    std::map<uint64_t, std::shared_ptr<WorkloadSchedPolicy>> policy_map;
+    for (const TopicInfo& topic_info : topic_info_list) {
+        if (!topic_info.__isset.workload_sched_policy) {
+            continue;
+        }
+
+        TWorkloadSchedPolicy tpolicy = topic_info.workload_sched_policy;
+        // some metric or action can not exec in be, then need skip
+        bool need_skip_current_policy = false;
+
+        std::vector<std::unique_ptr<WorkloadCondition>> cond_ptr_list;
+        for (TWorkloadCondition& cond : tpolicy.condition_list) {
+            std::unique_ptr<WorkloadCondition> cond_ptr =
+                    WorkloadConditionFactory::create_workload_condition(&cond);
+            if (cond_ptr == nullptr) {
+                need_skip_current_policy = true;
+                break;
+            }
+            cond_ptr_list.push_back(std::move(cond_ptr));
+        }
+        if (need_skip_current_policy) {
+            continue;
+        }
+
+        std::vector<std::unique_ptr<WorkloadAction>> action_ptr_list;
+        for (TWorkloadAction& action : tpolicy.action_list) {
+            std::unique_ptr<WorkloadAction> action_ptr =
+                    WorkloadActionFactory::create_workload_action(&action);
+            if (action_ptr == nullptr) {
+                need_skip_current_policy = true;
+                break;
+            }
+            action_ptr_list.push_back(std::move(action_ptr));
+        }
+        if (need_skip_current_policy) {
+            continue;
+        }
+
+        std::shared_ptr<WorkloadSchedPolicy> policy_ptr = 
std::make_shared<WorkloadSchedPolicy>();
+        policy_ptr->init(tpolicy.id, tpolicy.name, tpolicy.version, 
tpolicy.enabled,
+                         tpolicy.priority, std::move(cond_ptr_list), 
std::move(action_ptr_list));
+        policy_map.emplace(tpolicy.id, std::move(policy_ptr));
+    }
+    size_t new_policy_size = policy_map.size();
+    if (new_policy_size > 0) {
+        
_exec_env->workload_sched_policy_mgr()->update_workload_sched_policy(std::move(policy_map));
+    }
+    LOG(INFO) << "[workload_schedule]finish update workload schedule policy, 
size="
+              << new_policy_size;
+}
+} // namespace doris
\ No newline at end of file
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java
 b/be/src/agent/workload_sched_policy_listener.h
similarity index 59%
copy from 
fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java
copy to be/src/agent/workload_sched_policy_listener.h
index 6c5ce9e4c11..f1410b502fd 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java
+++ b/be/src/agent/workload_sched_policy_listener.h
@@ -15,23 +15,23 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.common.publish;
+#pragma once
 
-import org.apache.doris.catalog.Env;
-import org.apache.doris.thrift.TPublishTopicRequest;
-import org.apache.doris.thrift.TTopicInfoType;
+#include <glog/logging.h>
 
-public class WorkloadGroupPublisher implements TopicPublisher {
+#include "agent/topic_listener.h"
+#include "runtime/exec_env.h"
 
-    private Env env;
+namespace doris {
 
-    public WorkloadGroupPublisher(Env env) {
-        this.env = env;
-    }
+class WorkloadschedPolicyListener : public TopicListener {
+public:
+    WorkloadschedPolicyListener(ExecEnv* exec_env) : _exec_env(exec_env) {}
 
-    @Override
-    public void getTopicInfo(TPublishTopicRequest req) {
-        req.putToTopicMap(TTopicInfoType.WORKLOAD_GROUP,
-                env.getWorkloadGroupMgr().getPublishTopicInfo());
-    }
-}
+    void handle_topic_info(const std::vector<TopicInfo>& topic_info_list) 
override;
+
+private:
+    ExecEnv* _exec_env = nullptr;
+};
+
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 4c1060891f5..f8a52ad1ec4 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -63,6 +63,7 @@ class InvertedIndexSearcherCache;
 class InvertedIndexQueryCache;
 } // namespace segment_v2
 
+class WorkloadSchedPolicyMgr;
 class BfdParser;
 class BrokerMgr;
 template <class T>
@@ -153,6 +154,7 @@ public:
     pipeline::TaskScheduler* pipeline_task_scheduler() { return 
_without_group_task_scheduler; }
     pipeline::TaskScheduler* pipeline_task_group_scheduler() { return 
_with_group_task_scheduler; }
     taskgroup::TaskGroupManager* task_group_manager() { return 
_task_group_manager; }
+    WorkloadSchedPolicyMgr* workload_sched_policy_mgr() { return 
_workload_sched_mgr; }
 
     // using template to simplify client cache management
     template <typename T>
@@ -378,6 +380,8 @@ private:
     std::shared_ptr<doris::pipeline::BlockedTaskScheduler> 
_with_group_block_scheduler;
 
     doris::pipeline::RuntimeFilterTimerQueue* _runtime_filter_timer_queue = 
nullptr;
+
+    WorkloadSchedPolicyMgr* _workload_sched_mgr = nullptr;
 };
 
 template <>
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 4df69d67c03..48d229b9e21 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -76,6 +76,7 @@
 #include "runtime/task_group/task_group_manager.h"
 #include "runtime/thread_context.h"
 #include "runtime/user_function_cache.h"
+#include "runtime/workload_management/workload_sched_policy_mgr.h"
 #include "service/backend_options.h"
 #include "service/backend_service.h"
 #include "service/point_query_executor.h"
@@ -194,6 +195,10 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths,
                               .set_max_threads(1)
                               .set_max_queue_size(1000000)
                               .build(&_lazy_release_obj_pool));
+
+    _workload_sched_mgr = new WorkloadSchedPolicyMgr();
+    _workload_sched_mgr->start(this);
+
     init_file_cache_factory();
     RETURN_IF_ERROR(init_pipeline_task_scheduler());
     _task_group_manager = new taskgroup::TaskGroupManager();
@@ -519,6 +524,8 @@ void ExecEnv::destroy() {
     SAFE_STOP(_group_commit_mgr);
     // _routine_load_task_executor should be stopped before 
_new_load_stream_mgr.
     SAFE_STOP(_routine_load_task_executor);
+    // stop workload scheduler
+    SAFE_STOP(_workload_sched_mgr);
     // stop pipline step 1, non-cgroup execution
     SAFE_SHUTDOWN(_without_group_block_scheduler.get());
     SAFE_STOP(_without_group_task_scheduler);
@@ -593,6 +600,7 @@ void ExecEnv::destroy() {
     SAFE_DELETE(_bfd_parser);
     SAFE_DELETE(_result_cache);
     SAFE_DELETE(_fragment_mgr);
+    SAFE_DELETE(_workload_sched_mgr);
     SAFE_DELETE(_task_group_manager);
     SAFE_DELETE(_with_group_task_scheduler);
     SAFE_DELETE(_without_group_task_scheduler);
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 65e1ba475ae..9256c3bccfd 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -73,6 +73,7 @@
 #include "runtime/task_group/task_group_manager.h"
 #include "runtime/thread_context.h"
 #include "runtime/types.h"
+#include "runtime/workload_management/workload_query_info.h"
 #include "service/backend_options.h"
 #include "util/debug_util.h"
 #include "util/doris_metrics.h"
@@ -1567,4 +1568,23 @@ void 
FragmentMgr::_setup_shared_hashtable_for_broadcast_join(const TPipelineFrag
     }
 }
 
+void FragmentMgr::get_runtime_query_info(std::vector<WorkloadQueryInfo>* 
query_info_list) {
+    {
+        std::lock_guard<std::mutex> lock(_lock);
+        // todo: use monotonic time
+        VecDateTimeValue now = VecDateTimeValue::local_time();
+        for (const auto& q : _query_ctx_map) {
+            WorkloadQueryInfo workload_query_info;
+            workload_query_info.query_id = print_id(q.first);
+            workload_query_info.tquery_id = q.first;
+
+            uint64_t query_time_millisecond = q.second->query_time(now) * 1000;
+            
workload_query_info.metric_map.emplace(WorkloadMetricType::QUERY_TIME,
+                                                   
std::to_string(query_time_millisecond));
+            // todo, add scan rows, scan bytes
+            query_info_list->push_back(workload_query_info);
+        }
+    }
+}
+
 } // namespace doris
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 21d85503803..f9a6d4f72ed 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -64,6 +64,7 @@ class TPipelineInstanceParams;
 class TScanColumnDesc;
 class TScanOpenParams;
 class Thread;
+class WorkloadQueryInfo;
 
 std::string to_load_error_http_path(const std::string& file_name);
 
@@ -153,6 +154,8 @@ public:
 
     std::string dump_pipeline_tasks();
 
+    void get_runtime_query_info(std::vector<WorkloadQueryInfo>* 
_query_info_list);
+
 private:
     void cancel_unlocked_impl(const TUniqueId& id, const 
PPlanFragmentCancelReason& reason,
                               const std::unique_lock<std::mutex>& state_lock, 
bool is_pipeline,
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index a230fd653e8..203c5b6e3f4 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -93,6 +93,8 @@ public:
         return false;
     }
 
+    int64_t query_time(VecDateTimeValue& now) { return 
now.second_diff(_start_time); }
+
     void set_thread_token(int concurrency, bool is_serial) {
         _thread_token = 
_exec_env->scanner_scheduler()->new_limited_scan_pool_token(
                 is_serial ? ThreadPool::ExecutionMode::SERIAL
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java
 b/be/src/runtime/workload_management/workload_action.cpp
similarity index 55%
copy from 
fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java
copy to be/src/runtime/workload_management/workload_action.cpp
index 6c5ce9e4c11..39916bc7cc1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java
+++ b/be/src/runtime/workload_management/workload_action.cpp
@@ -15,23 +15,21 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.common.publish;
+#include "runtime/workload_management/workload_action.h"
 
-import org.apache.doris.catalog.Env;
-import org.apache.doris.thrift.TPublishTopicRequest;
-import org.apache.doris.thrift.TTopicInfoType;
+#include "runtime/fragment_mgr.h"
 
-public class WorkloadGroupPublisher implements TopicPublisher {
+namespace doris {
 
-    private Env env;
+void WorkloadActionCancelQuery::exec(WorkloadQueryInfo* query_info) {
+    LOG(INFO) << "[workload_schedule]workload scheduler cancel query " << 
query_info->query_id;
+    ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
+            query_info->tquery_id, PPlanFragmentCancelReason::INTERNAL_ERROR,
+            std::string("query canceled by workload scheduler"));
+}
 
-    public WorkloadGroupPublisher(Env env) {
-        this.env = env;
-    }
+void WorkloadActionMoveQuery::exec(WorkloadQueryInfo* query_info) {
+    LOG(INFO) << "[workload_schedule]move query action run group=" << _wg_name;
+};
 
-    @Override
-    public void getTopicInfo(TPublishTopicRequest req) {
-        req.putToTopicMap(TTopicInfoType.WORKLOAD_GROUP,
-                env.getWorkloadGroupMgr().getPublishTopicInfo());
-    }
-}
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime/workload_management/workload_action.h 
b/be/src/runtime/workload_management/workload_action.h
new file mode 100644
index 00000000000..29c01320b78
--- /dev/null
+++ b/be/src/runtime/workload_management/workload_action.h
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "runtime/workload_management/workload_query_info.h"
+
+namespace doris {
+
+enum WorkloadActionType { MOVE_QUERY_TO_GROUP = 0, CANCEL_QUERY = 1 };
+
+class WorkloadAction {
+public:
+    WorkloadAction() = default;
+    virtual ~WorkloadAction() = default;
+
+    virtual void exec(WorkloadQueryInfo* query_info) = 0;
+
+    virtual WorkloadActionType get_action_type() = 0;
+};
+
+class WorkloadActionCancelQuery : public WorkloadAction {
+public:
+    void exec(WorkloadQueryInfo* query_info) override;
+
+    WorkloadActionType get_action_type() override { return CANCEL_QUERY; }
+};
+
+//todo(wb) implement it
+class WorkloadActionMoveQuery : public WorkloadAction {
+public:
+    WorkloadActionMoveQuery(std::string wg_name) : _wg_name(wg_name) {}
+    void exec(WorkloadQueryInfo* query_info) override;
+
+    WorkloadActionType get_action_type() override { return 
MOVE_QUERY_TO_GROUP; }
+
+private:
+    std::string _wg_name;
+};
+
+class WorkloadActionFactory {
+public:
+    static std::unique_ptr<WorkloadAction> 
create_workload_action(TWorkloadAction* action) {
+        if (TWorkloadActionType::type::MOVE_QUERY_TO_GROUP == action->action) {
+            return 
std::make_unique<WorkloadActionMoveQuery>(action->action_args);
+        } else if (TWorkloadActionType::type::CANCEL_QUERY == action->action) {
+            return std::make_unique<WorkloadActionCancelQuery>();
+        }
+        LOG(ERROR) << "not find a action " << action->action;
+        return nullptr;
+    }
+};
+
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime/workload_management/workload_comparator.h 
b/be/src/runtime/workload_management/workload_comparator.h
new file mode 100644
index 00000000000..00981a89dc8
--- /dev/null
+++ b/be/src/runtime/workload_management/workload_comparator.h
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <string>
+
+#include "common/status.h"
+
+namespace doris {
+
+enum WorkloadCompareOperator { EQUAL, GREATER, GREATER_EQUAL, LESS, 
LESS_EQUAL, INVALID_OP };
+
+class WorkloadCompareUtils {
+public:
+    static WorkloadCompareOperator 
get_workload_compare_operator(TCompareOperator::type t_op) {
+        if (TCompareOperator::type::EQUAL == t_op) {
+            return EQUAL;
+        } else if (TCompareOperator::type::GREATER == t_op) {
+            return GREATER;
+        } else if (TCompareOperator::type::GREATER_EQUAL == t_op) {
+            return GREATER_EQUAL;
+        } else if (TCompareOperator::type::LESS == t_op) {
+            return LESS;
+        } else if (TCompareOperator::type::LESS_EQUAL == t_op) {
+            return LESS_EQUAL;
+        }
+        LOG(ERROR) << "can not find a valid op ";
+        return INVALID_OP;
+    }
+
+    static bool compare_signed_integer(WorkloadCompareOperator op, int64_t 
first_val,
+                                       int64_t second_val) {
+        switch (op) {
+        case EQUAL:
+            return first_val == second_val;
+        case GREATER:
+            return first_val > second_val;
+        case GREATER_EQUAL:
+            return first_val >= second_val;
+        case LESS:
+            return first_val < second_val;
+        case LESS_EQUAL:
+            return first_val <= second_val;
+        default:
+            LOG(ERROR) << "unexpected signed integer compare operator " << op;
+            return false;
+        }
+    }
+
+    static bool compare_string(WorkloadCompareOperator op, std::string 
first_val,
+                               std::string second_val) {
+        switch (op) {
+        case EQUAL:
+            return first_val == second_val;
+        default:
+            LOG(ERROR) << "unexpected string compare operator " << op;
+            return false;
+        }
+    }
+};
+
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime/workload_management/workload_condition.cpp 
b/be/src/runtime/workload_management/workload_condition.cpp
new file mode 100644
index 00000000000..dff6f2adc24
--- /dev/null
+++ b/be/src/runtime/workload_management/workload_condition.cpp
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/workload_management/workload_condition.h"
+
+namespace doris {
+
+// query time
+WorkloadConditionQueryTime::WorkloadConditionQueryTime(WorkloadCompareOperator 
op,
+                                                       std::string str_val) {
+    _op = op;
+    _query_time = std::stol(str_val);
+}
+
+bool WorkloadConditionQueryTime::eval(std::string str_val) {
+    int64_t query_time_args = std::stol(str_val);
+    return WorkloadCompareUtils::compare_signed_integer(_op, query_time_args, 
_query_time);
+}
+
+// scan rows
+WorkloadConditionScanRows::WorkloadConditionScanRows(WorkloadCompareOperator 
op,
+                                                     std::string str_val) {
+    _op = op;
+    _scan_rows = std::stol(str_val);
+}
+
+bool WorkloadConditionScanRows::eval(std::string str_val) {
+    int64_t scan_rows_args = std::stol(str_val);
+    return WorkloadCompareUtils::compare_signed_integer(_op, scan_rows_args, 
_scan_rows);
+}
+
+// scan bytes
+WorkloadConditionScanBytes::WorkloadConditionScanBytes(WorkloadCompareOperator 
op,
+                                                       std::string str_val) {
+    _op = op;
+    _scan_bytes = std::stol(str_val);
+}
+
+// todo(wb): need handle invalid input value
+bool WorkloadConditionScanBytes::eval(std::string str_val) {
+    int64_t scan_bytes_args = std::stol(str_val);
+    return WorkloadCompareUtils::compare_signed_integer(_op, scan_bytes_args, 
_scan_bytes);
+}
+
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime/workload_management/workload_condition.h 
b/be/src/runtime/workload_management/workload_condition.h
new file mode 100644
index 00000000000..3486742c968
--- /dev/null
+++ b/be/src/runtime/workload_management/workload_condition.h
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <gen_cpp/BackendService_types.h>
+
+#include "runtime/workload_management/workload_comparator.h"
+
+namespace doris {
+
+enum WorkloadMetricType { QUERY_TIME, SCAN_ROWS, SCAN_BYTES };
+
+class WorkloadCondition {
+public:
+    WorkloadCondition() = default;
+    virtual ~WorkloadCondition() = default;
+
+    virtual bool eval(std::string str_val) = 0;
+
+    virtual WorkloadMetricType get_workload_metric_type() = 0;
+};
+
+class WorkloadConditionQueryTime : public WorkloadCondition {
+public:
+    WorkloadConditionQueryTime(WorkloadCompareOperator op, std::string 
str_val);
+
+    bool eval(std::string str_val) override;
+
+    WorkloadMetricType get_workload_metric_type() override {
+        return WorkloadMetricType::QUERY_TIME;
+    }
+
+private:
+    int64_t _query_time;
+    WorkloadCompareOperator _op;
+};
+
+class WorkloadConditionScanRows : public WorkloadCondition {
+public:
+    WorkloadConditionScanRows(WorkloadCompareOperator op, std::string str_val);
+    bool eval(std::string str_val) override;
+    WorkloadMetricType get_workload_metric_type() override { return 
WorkloadMetricType::SCAN_ROWS; }
+
+private:
+    int64_t _scan_rows;
+    WorkloadCompareOperator _op;
+};
+
+class WorkloadConditionScanBytes : public WorkloadCondition {
+public:
+    WorkloadConditionScanBytes(WorkloadCompareOperator op, std::string 
str_val);
+    bool eval(std::string str_val) override;
+    WorkloadMetricType get_workload_metric_type() override {
+        return WorkloadMetricType::SCAN_BYTES;
+    }
+
+private:
+    int64_t _scan_bytes;
+    WorkloadCompareOperator _op;
+};
+
+class WorkloadConditionFactory {
+public:
+    static std::unique_ptr<WorkloadCondition> create_workload_condition(
+            TWorkloadCondition* t_cond) {
+        WorkloadCompareOperator op =
+                
WorkloadCompareUtils::get_workload_compare_operator(t_cond->op);
+        std::string str_val = t_cond->value;
+        TWorkloadMetricType::type metric_name = t_cond->metric_name;
+        if (TWorkloadMetricType::type::QUERY_TIME == metric_name) {
+            return std::make_unique<WorkloadConditionQueryTime>(op, str_val);
+        } else if (TWorkloadMetricType::type::SCAN_ROWS == metric_name) {
+            return std::make_unique<WorkloadConditionScanRows>(op, str_val);
+        } else if (TWorkloadMetricType::type::SCAN_BYTES == metric_name) {
+            return std::make_unique<WorkloadConditionScanBytes>(op, str_val);
+        }
+        LOG(ERROR) << "not find a metric name " << metric_name;
+        return nullptr;
+    }
+};
+
+} // namespace doris
\ No newline at end of file
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java
 b/be/src/runtime/workload_management/workload_query_info.h
similarity index 59%
copy from 
fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java
copy to be/src/runtime/workload_management/workload_query_info.h
index 6c5ce9e4c11..9c24e9dee8a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java
+++ b/be/src/runtime/workload_management/workload_query_info.h
@@ -15,23 +15,19 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.common.publish;
+#pragma once
 
-import org.apache.doris.catalog.Env;
-import org.apache.doris.thrift.TPublishTopicRequest;
-import org.apache.doris.thrift.TTopicInfoType;
+#include <map>
 
-public class WorkloadGroupPublisher implements TopicPublisher {
+#include "runtime/workload_management/workload_condition.h"
 
-    private Env env;
+namespace doris {
 
-    public WorkloadGroupPublisher(Env env) {
-        this.env = env;
-    }
+class WorkloadQueryInfo {
+public:
+    std::map<WorkloadMetricType, std::string> metric_map;
+    TUniqueId tquery_id;
+    std::string query_id;
+};
 
-    @Override
-    public void getTopicInfo(TPublishTopicRequest req) {
-        req.putToTopicMap(TTopicInfoType.WORKLOAD_GROUP,
-                env.getWorkloadGroupMgr().getPublishTopicInfo());
-    }
-}
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime/workload_management/workload_sched_policy.cpp 
b/be/src/runtime/workload_management/workload_sched_policy.cpp
new file mode 100644
index 00000000000..bc543a7d770
--- /dev/null
+++ b/be/src/runtime/workload_management/workload_sched_policy.cpp
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/workload_management/workload_sched_policy.h"
+
+namespace doris {
+
+void WorkloadSchedPolicy::init(int64_t id, std::string name, int version, bool 
enabled,
+                               int priority,
+                               std::vector<std::unique_ptr<WorkloadCondition>> 
condition_list,
+                               std::vector<std::unique_ptr<WorkloadAction>> 
action_list) {
+    _id = id;
+    _name = name;
+    _version = version;
+    _enabled = enabled;
+    _priority = priority;
+    _condition_list = std::move(condition_list);
+    _action_list = std::move(action_list);
+
+    _first_action_type = _action_list[0]->get_action_type();
+    if (_first_action_type != WorkloadActionType::MOVE_QUERY_TO_GROUP &&
+        _first_action_type != WorkloadActionType::CANCEL_QUERY) {
+        for (int i = 1; i < _action_list.size(); i++) {
+            WorkloadActionType cur_action_type = 
_action_list[i]->get_action_type();
+            // one policy can not both contains move and cancel
+            if (cur_action_type == WorkloadActionType::MOVE_QUERY_TO_GROUP ||
+                cur_action_type == WorkloadActionType::CANCEL_QUERY) {
+                _first_action_type = cur_action_type;
+                break;
+            }
+        }
+    }
+}
+
+bool WorkloadSchedPolicy::is_match(WorkloadQueryInfo* query_info_ptr) {
+    if (!_enabled) {
+        return false;
+    }
+    auto& metric_val_map = query_info_ptr->metric_map;
+    for (auto& cond : _condition_list) {
+        if (metric_val_map.find(cond->get_workload_metric_type()) == 
metric_val_map.end()) {
+            return false;
+        }
+
+        std::string val = metric_val_map.at(cond->get_workload_metric_type());
+        if (!cond->eval(val)) {
+            return false;
+        }
+    }
+    return true;
+}
+
+void WorkloadSchedPolicy::exec_action(WorkloadQueryInfo* query_info) {
+    for (int i = 0; i < _action_list.size(); i++) {
+        _action_list[i]->exec(query_info);
+    }
+}
+
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime/workload_management/workload_sched_policy.h 
b/be/src/runtime/workload_management/workload_sched_policy.h
new file mode 100644
index 00000000000..82f42b9a0b4
--- /dev/null
+++ b/be/src/runtime/workload_management/workload_sched_policy.h
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <vector>
+
+#include "runtime/workload_management/workload_action.h"
+#include "runtime/workload_management/workload_condition.h"
+
+namespace doris {
+
+class WorkloadSchedPolicy {
+public:
+    WorkloadSchedPolicy() = default;
+    ~WorkloadSchedPolicy() = default;
+
+    void init(int64_t id, std::string name, int version, bool enabled, int 
priority,
+              std::vector<std::unique_ptr<WorkloadCondition>> condition_list,
+              std::vector<std::unique_ptr<WorkloadAction>> action_list);
+
+    bool enabled() { return _enabled; }
+    int priority() { return _priority; }
+
+    bool is_match(WorkloadQueryInfo* query_info);
+
+    WorkloadActionType get_first_action_type() { return _first_action_type; }
+
+    void exec_action(WorkloadQueryInfo* query_info);
+
+    int version() { return _version; }
+
+private:
+    int64_t _id;
+    std::string _name;
+    int _version;
+    bool _enabled;
+    int _priority;
+
+    std::vector<std::unique_ptr<WorkloadCondition>> _condition_list;
+    std::vector<std::unique_ptr<WorkloadAction>> _action_list;
+
+    WorkloadActionType _first_action_type;
+};
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime/workload_management/workload_sched_policy_mgr.cpp 
b/be/src/runtime/workload_management/workload_sched_policy_mgr.cpp
new file mode 100644
index 00000000000..8a30b5395eb
--- /dev/null
+++ b/be/src/runtime/workload_management/workload_sched_policy_mgr.cpp
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/workload_management/workload_sched_policy_mgr.h"
+
+#include "runtime/fragment_mgr.h"
+
+namespace doris {
+
+void WorkloadSchedPolicyMgr::start(ExecEnv* exec_env) {
+    _stop_latch.reset(1);
+    _exec_env = exec_env;
+
+    Status st;
+    st = Thread::create(
+            "workload", "workload_scheduler", [this]() { 
this->_schedule_workload(); }, &_thread);
+    if (!st.ok()) {
+        LOG(WARNING) << "create workload scheduler thread failed";
+    } else {
+        LOG(INFO) << "start workload scheduler ";
+    }
+}
+
+void WorkloadSchedPolicyMgr::stop() {
+    std::lock_guard<std::shared_mutex> write_lock(_stop_lock);
+    if (_stop_latch.count() == 0) {
+        LOG(INFO) << "workload schedule manager is already stopped. ";
+        return;
+    }
+    _stop_latch.count_down();
+    if (_thread) {
+        _thread->join();
+    }
+    LOG(INFO) << "workload schedule manager stopped, thread is finished. ";
+}
+
+void WorkloadSchedPolicyMgr::update_workload_sched_policy(
+        std::map<uint64_t, std::shared_ptr<WorkloadSchedPolicy>> 
new_policy_map) {
+    std::lock_guard<std::shared_mutex> write_lock(_policy_lock);
+    // 1 upsert
+    for (const auto& [id, policy] : new_policy_map) {
+        if (_id_policy_map.find(id) == _id_policy_map.end()) {
+            _id_policy_map.emplace(id, policy);
+        } else {
+            if (policy->version() > _id_policy_map.at(id)->version()) {
+                _id_policy_map[id] = policy;
+            }
+        }
+    }
+
+    // 2 delete
+    for (auto iter = _id_policy_map.begin(); iter != _id_policy_map.end();) {
+        if (new_policy_map.find(iter->first) == new_policy_map.end()) {
+            iter = _id_policy_map.erase(iter);
+        } else {
+            iter++;
+        }
+    }
+}
+
+void WorkloadSchedPolicyMgr::_schedule_workload() {
+    while (!_stop_latch.wait_for(std::chrono::milliseconds(500))) {
+        // 1 get query info
+        std::vector<WorkloadQueryInfo> list;
+        _exec_env->fragment_mgr()->get_runtime_query_info(&list);
+        // todo: add timer
+        LOG(INFO) << "[workload_schedule] get query list size=" << list.size();
+
+        for (int i = 0; i < list.size(); i++) {
+            WorkloadQueryInfo* query_info_ptr = &(list[i]);
+            // 2 get matched policy
+            std::map<WorkloadActionType, std::shared_ptr<WorkloadSchedPolicy>> 
matched_policy_map;
+            {
+                std::shared_lock<std::shared_mutex> read_lock(_policy_lock);
+                for (auto& entity : _id_policy_map) {
+                    auto& new_policy = entity.second;
+                    if (new_policy->is_match(query_info_ptr)) {
+                        WorkloadActionType new_policy_type = 
new_policy->get_first_action_type();
+                        if (matched_policy_map.find(new_policy_type) == 
matched_policy_map.end() ||
+                            new_policy->priority() >
+                                    
matched_policy_map.at(new_policy_type)->priority()) {
+                            matched_policy_map[new_policy_type] = new_policy;
+                        }
+                    }
+                }
+            }
+
+            if (matched_policy_map.size() == 0) {
+                continue;
+            }
+            LOG(INFO) << "[workload_schedule] matched policy size=" << 
matched_policy_map.size();
+            // 3 check action conflicts
+            if 
(matched_policy_map.find(WorkloadActionType::MOVE_QUERY_TO_GROUP) !=
+                        matched_policy_map.end() &&
+                matched_policy_map.find(WorkloadActionType::CANCEL_QUERY) !=
+                        matched_policy_map.end()) {
+                // compare priority
+                int move_prio =
+                        
matched_policy_map.at(WorkloadActionType::MOVE_QUERY_TO_GROUP)->priority();
+                int cancel_prio =
+                        
matched_policy_map.at(WorkloadActionType::CANCEL_QUERY)->priority();
+                if (cancel_prio >= move_prio) {
+                    
matched_policy_map.erase(WorkloadActionType::MOVE_QUERY_TO_GROUP);
+                } else {
+                    matched_policy_map.erase(WorkloadActionType::CANCEL_QUERY);
+                }
+            }
+
+            // 4 exec policy action
+            for (const auto& [key, value] : matched_policy_map) {
+                value->exec_action(query_info_ptr);
+            }
+        }
+    }
+}
+
+} // namespace doris
\ No newline at end of file
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java
 b/be/src/runtime/workload_management/workload_sched_policy_mgr.h
similarity index 51%
copy from 
fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java
copy to be/src/runtime/workload_management/workload_sched_policy_mgr.h
index 6c5ce9e4c11..52b41eacf4b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java
+++ b/be/src/runtime/workload_management/workload_sched_policy_mgr.h
@@ -15,23 +15,36 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.common.publish;
+#pragma once
 
-import org.apache.doris.catalog.Env;
-import org.apache.doris.thrift.TPublishTopicRequest;
-import org.apache.doris.thrift.TTopicInfoType;
+#include "runtime/exec_env.h"
+#include "runtime/workload_management/workload_sched_policy.h"
+#include "util/countdown_latch.h"
 
-public class WorkloadGroupPublisher implements TopicPublisher {
+namespace doris {
 
-    private Env env;
+class WorkloadSchedPolicyMgr {
+public:
+    WorkloadSchedPolicyMgr() : _stop_latch(0) {}
+    ~WorkloadSchedPolicyMgr() = default;
 
-    public WorkloadGroupPublisher(Env env) {
-        this.env = env;
-    }
+    void start(ExecEnv* exec_env);
 
-    @Override
-    public void getTopicInfo(TPublishTopicRequest req) {
-        req.putToTopicMap(TTopicInfoType.WORKLOAD_GROUP,
-                env.getWorkloadGroupMgr().getPublishTopicInfo());
-    }
-}
+    void stop();
+
+    void update_workload_sched_policy(
+            std::map<uint64_t, std::shared_ptr<WorkloadSchedPolicy>> 
policy_map);
+
+private:
+    void _schedule_workload();
+
+    std::shared_mutex _policy_lock;
+    std::map<uint64_t, std::shared_ptr<WorkloadSchedPolicy>> _id_policy_map;
+
+    std::shared_mutex _stop_lock;
+    CountDownLatch _stop_latch;
+    scoped_refptr<Thread> _thread;
+    ExecEnv* _exec_env;
+};
+
+}; // namespace doris
\ No newline at end of file
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index cd77f70f59c..1ad131abb0a 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -230,6 +230,7 @@ import org.apache.doris.qe.VariableMgr;
 import org.apache.doris.resource.Tag;
 import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr;
 import org.apache.doris.resource.workloadschedpolicy.WorkloadSchedPolicyMgr;
+import 
org.apache.doris.resource.workloadschedpolicy.WorkloadSchedPolicyPublisher;
 import org.apache.doris.scheduler.manager.TransientTaskManager;
 import org.apache.doris.scheduler.registry.ExportTaskRegister;
 import org.apache.doris.service.ExecuteEnv;
@@ -998,6 +999,8 @@ public class Env {
 
         TopicPublisher wgPublisher = new WorkloadGroupPublisher(this);
         topicPublisherThread.addToTopicPublisherList(wgPublisher);
+        WorkloadSchedPolicyPublisher wpPublisher = new 
WorkloadSchedPolicyPublisher(this);
+        topicPublisherThread.addToTopicPublisherList(wpPublisher);
         topicPublisherThread.start();
 
         workloadGroupMgr.startUpdateThread();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisherThread.java
 
b/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisherThread.java
index 616c8a30b56..db5158e24f5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisherThread.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisherThread.java
@@ -59,9 +59,6 @@ public class TopicPublisherThread extends MasterDaemon {
 
     @Override
     protected void runAfterCatalogReady() {
-        if (!Config.enable_workload_group) {
-            return;
-        }
         LOG.info("begin publish topic info");
         // step 1: get all publish topic info
         TPublishTopicRequest request = new TPublishTopicRequest();
@@ -69,6 +66,10 @@ public class TopicPublisherThread extends MasterDaemon {
             topicPublisher.getTopicInfo(request);
         }
 
+        if (request.getTopicMap().size() == 0) {
+            return;
+        }
+
         // step 2: publish topic info to all be
         Collection<Backend> nodesToPublish = 
clusterInfoService.getIdToBackend().values();
         AckResponseHandler handler = new AckResponseHandler(nodesToPublish);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java
 
b/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java
index 6c5ce9e4c11..e81ccb06d3e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java
@@ -20,6 +20,9 @@ package org.apache.doris.common.publish;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.thrift.TPublishTopicRequest;
 import org.apache.doris.thrift.TTopicInfoType;
+import org.apache.doris.thrift.TopicInfo;
+
+import java.util.List;
 
 public class WorkloadGroupPublisher implements TopicPublisher {
 
@@ -31,7 +34,9 @@ public class WorkloadGroupPublisher implements TopicPublisher 
{
 
     @Override
     public void getTopicInfo(TPublishTopicRequest req) {
-        req.putToTopicMap(TTopicInfoType.WORKLOAD_GROUP,
-                env.getWorkloadGroupMgr().getPublishTopicInfo());
+        List<TopicInfo> list = env.getWorkloadGroupMgr().getPublishTopicInfo();
+        if (list.size() > 0) {
+            req.putToTopicMap(TTopicInfoType.WORKLOAD_GROUP, list);
+        }
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java
index 827c2367133..d514ea62d2d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java
@@ -22,8 +22,16 @@ import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.persist.gson.GsonPostProcessable;
 import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.thrift.TCompareOperator;
+import org.apache.doris.thrift.TWorkloadAction;
+import org.apache.doris.thrift.TWorkloadActionType;
+import org.apache.doris.thrift.TWorkloadCondition;
+import org.apache.doris.thrift.TWorkloadMetricType;
+import org.apache.doris.thrift.TWorkloadSchedPolicy;
+import org.apache.doris.thrift.TopicInfo;
 
 import com.esotericsoftware.minlog.Log;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.gson.annotations.SerializedName;
 
@@ -41,6 +49,23 @@ public class WorkloadSchedPolicy implements Writable, 
GsonPostProcessable {
     public static final ImmutableSet<String> POLICY_PROPERTIES = new 
ImmutableSet.Builder<String>()
             .add(ENABLED).add(PRIORITY).build();
 
+    // used for convert fe type to thrift type
+    private static ImmutableMap<WorkloadMetricType, TWorkloadMetricType> 
METRIC_MAP
+            = new ImmutableMap.Builder<WorkloadMetricType, 
TWorkloadMetricType>()
+            .put(WorkloadMetricType.QUERY_TIME, 
TWorkloadMetricType.QUERY_TIME).build();
+    private static ImmutableMap<WorkloadActionType, TWorkloadActionType> 
ACTION_MAP
+            = new ImmutableMap.Builder<WorkloadActionType, 
TWorkloadActionType>()
+            .put(WorkloadActionType.MOVE_QUERY_TO_GROUP, 
TWorkloadActionType.MOVE_QUERY_TO_GROUP)
+            .put(WorkloadActionType.CANCEL_QUERY, 
TWorkloadActionType.CANCEL_QUERY).build();
+
+    private static ImmutableMap<WorkloadConditionOperator, TCompareOperator> 
OP_MAP
+            = new ImmutableMap.Builder<WorkloadConditionOperator, 
TCompareOperator>()
+            .put(WorkloadConditionOperator.EQUAL, TCompareOperator.EQUAL)
+            .put(WorkloadConditionOperator.GREATER, TCompareOperator.GREATER)
+            .put(WorkloadConditionOperator.GREATER_EQUAL, 
TCompareOperator.GREATER_EQUAL)
+            .put(WorkloadConditionOperator.LESS, TCompareOperator.LESS)
+            .put(WorkloadConditionOperator.LESS_EQUAl, 
TCompareOperator.LESS_EQUAL).build();
+
     @SerializedName(value = "id")
     long id;
     @SerializedName(value = "name")
@@ -173,6 +198,48 @@ public class WorkloadSchedPolicy implements Writable, 
GsonPostProcessable {
         return actionMetaList;
     }
 
+    public TopicInfo toTopicInfo() {
+        TWorkloadSchedPolicy tPolicy = new TWorkloadSchedPolicy();
+        tPolicy.setId(id);
+        tPolicy.setName(name);
+        tPolicy.setVersion(version);
+        tPolicy.setPriority(priority);
+        tPolicy.setEnabled(enabled);
+
+        List<TWorkloadCondition> condList = new ArrayList();
+        for (WorkloadConditionMeta cond : conditionMetaList) {
+            TWorkloadCondition tCond = new TWorkloadCondition();
+            TWorkloadMetricType metricType = METRIC_MAP.get(cond.metricName);
+            if (metricType == null) {
+                return null;
+            }
+            tCond.setMetricName(metricType);
+            tCond.setOp(OP_MAP.get(cond.op));
+            tCond.setValue(cond.value);
+            condList.add(tCond);
+        }
+
+        List<TWorkloadAction> actionList = new ArrayList();
+        for (WorkloadActionMeta action : actionMetaList) {
+            TWorkloadAction tAction = new TWorkloadAction();
+            TWorkloadActionType tActionType = ACTION_MAP.get(action.action);
+            if (tActionType == null) {
+                return null;
+            }
+            tAction.setAction(tActionType);
+            tAction.setActionArgs(action.actionArgs);
+            actionList.add(tAction);
+        }
+
+        tPolicy.setConditionList(condList);
+        tPolicy.setActionList(actionList);
+
+        TopicInfo topicInfo = new TopicInfo();
+        topicInfo.setWorkloadSchedPolicy(tPolicy);
+
+        return topicInfo;
+    }
+
     @Override
     public void write(DataOutput out) throws IOException {
         String json = GsonUtils.GSON.toJson(this);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java
index 346e34796c7..45ba3a35de4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java
@@ -35,6 +35,7 @@ import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.service.ExecuteEnv;
 import org.apache.doris.thrift.TUserIdentity;
+import org.apache.doris.thrift.TopicInfo;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
@@ -412,6 +413,22 @@ public class WorkloadSchedPolicyMgr implements Writable, 
GsonPostProcessable {
         lock.writeLock().unlock();
     }
 
+    public List<TopicInfo> getPublishTopicInfoList() {
+        List<TopicInfo> topicInfoList = new ArrayList();
+        readLock();
+        try {
+            for (Map.Entry<Long, WorkloadSchedPolicy> entry : 
idToPolicy.entrySet()) {
+                TopicInfo tInfo = entry.getValue().toTopicInfo();
+                if (tInfo != null) {
+                    topicInfoList.add(tInfo);
+                }
+            }
+        } finally {
+            readUnlock();
+        }
+        return topicInfoList;
+    }
+
     public void replayCreateWorkloadSchedPolicy(WorkloadSchedPolicy policy) {
         insertWorkloadSchedPolicy(policy);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyPublisher.java
similarity index 68%
copy from 
fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java
copy to 
fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyPublisher.java
index 6c5ce9e4c11..5083d183e6c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyPublisher.java
@@ -15,23 +15,30 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.common.publish;
+package org.apache.doris.resource.workloadschedpolicy;
 
 import org.apache.doris.catalog.Env;
+import org.apache.doris.common.publish.TopicPublisher;
 import org.apache.doris.thrift.TPublishTopicRequest;
 import org.apache.doris.thrift.TTopicInfoType;
+import org.apache.doris.thrift.TopicInfo;
 
-public class WorkloadGroupPublisher implements TopicPublisher {
+import java.util.List;
+
+public class WorkloadSchedPolicyPublisher implements TopicPublisher {
 
     private Env env;
 
-    public WorkloadGroupPublisher(Env env) {
+    public WorkloadSchedPolicyPublisher(Env env) {
         this.env = env;
     }
 
     @Override
     public void getTopicInfo(TPublishTopicRequest req) {
-        req.putToTopicMap(TTopicInfoType.WORKLOAD_GROUP,
-                env.getWorkloadGroupMgr().getPublishTopicInfo());
+        List<TopicInfo> list = 
env.getWorkloadSchedPolicyMgr().getPublishTopicInfoList();
+        if (list.size() > 0) {
+            req.putToTopicMap(TTopicInfoType.WORKLOAD_SCHED_POLICY, list);
+        }
     }
+
 }
diff --git a/gensrc/thrift/BackendService.thrift 
b/gensrc/thrift/BackendService.thrift
index 855b7489711..2c5b199fc17 100644
--- a/gensrc/thrift/BackendService.thrift
+++ b/gensrc/thrift/BackendService.thrift
@@ -163,6 +163,7 @@ struct TQueryIngestBinlogResult {
 enum TTopicInfoType {
     WORKLOAD_GROUP
     MOVE_QUERY_TO_GROUP
+    WORKLOAD_SCHED_POLICY
 }
 
 struct TWorkloadGroupInfo {
@@ -178,12 +179,53 @@ struct TWorkloadGroupInfo {
 
 struct TWorkloadMoveQueryToGroupAction {
     1: optional Types.TUniqueId query_id
-    2: optional i64 workload_group_id;
+    2: optional i64 workload_group_id
+}
+
+enum TWorkloadMetricType {
+    QUERY_TIME
+    SCAN_ROWS
+    SCAN_BYTES
+}
+
+enum TCompareOperator {
+    EQUAL
+    GREATER
+    GREATER_EQUAL
+    LESS
+    LESS_EQUAL
+}
+
+struct TWorkloadCondition {
+    1: optional TWorkloadMetricType metric_name
+    2: optional TCompareOperator op
+    3: optional string value
+}
+
+enum TWorkloadActionType {
+    MOVE_QUERY_TO_GROUP
+    CANCEL_QUERY
+}
+
+struct TWorkloadAction {
+    1: optional TWorkloadActionType action
+    2: optional string action_args
+}
+
+struct TWorkloadSchedPolicy {
+    1: optional i64 id
+    2: optional string name
+    3: optional i32 version
+    4: optional i32 priority
+    5: optional bool enabled
+    6: optional list<TWorkloadCondition> condition_list
+    7: optional list<TWorkloadAction> action_list
 }
 
 struct TopicInfo {
     1: optional TWorkloadGroupInfo workload_group_info
     2: optional TWorkloadMoveQueryToGroupAction move_action
+    3: optional TWorkloadSchedPolicy workload_sched_policy
 }
 
 struct TPublishTopicRequest {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to