This is an automated email from the ASF dual-hosted git repository.

wangbo pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new bdca9cce3e3 [branch-2.1]add internal workload group (#42006) (#44592)
bdca9cce3e3 is described below

commit bdca9cce3e3174dfeecb56ada2d3ebe173d311b6
Author: wangbo <wan...@selectdb.com>
AuthorDate: Tue Nov 26 17:09:55 2024 +0800

    [branch-2.1]add internal workload group (#42006) (#44592)
    
    Add an internal workload group when Doris started, currently it mainly
    used to manage compaction workload cpu usage.
    pick #42006
---
 be/src/agent/cgroup_cpu_ctl.cpp                    |   6 +-
 be/src/agent/cgroup_cpu_ctl.h                      |   2 +-
 be/src/agent/topic_subscriber.cpp                  |   6 +-
 be/src/olap/olap_server.cpp                        |  77 ++++---
 be/src/olap/storage_engine.h                       |   3 +-
 be/src/pipeline/task_scheduler.h                   |   4 +-
 be/src/runtime/exec_env_init.cpp                   |   4 +-
 be/src/runtime/workload_group/workload_group.cpp   |  94 +++++----
 be/src/runtime/workload_group/workload_group.h     |  20 +-
 .../workload_group/workload_group_manager.cpp      |  23 +++
 .../workload_group/workload_group_manager.h        |  12 ++
 be/src/util/threadpool.cpp                         |   7 +-
 be/src/util/threadpool.h                           |   6 +-
 be/src/vec/exec/scan/scanner_scheduler.h           |   9 +-
 be/src/vec/sink/writer/async_result_writer.cpp     |  12 ++
 .../doris/analysis/AlterWorkloadGroupStmt.java     |  20 +-
 .../doris/analysis/CreateWorkloadGroupStmt.java    |  18 +-
 .../doris/analysis/DropWorkloadGroupStmt.java      |   3 -
 .../main/java/org/apache/doris/catalog/Env.java    |   2 +
 .../java/org/apache/doris/common/FeConstants.java  |   3 +
 .../CreateInternalWorkloadGroupThread.java         |  55 +++++
 .../resource/workloadgroup/WorkloadGroup.java      |  81 ++++++--
 .../resource/workloadgroup/WorkloadGroupMgr.java   | 109 +++++++---
 .../workloadgroup/WorkloadGroupMgrTest.java        | 222 +++++++++++++++++++++
 .../apache/doris/utframe/TestWithFeService.java    |   1 +
 gensrc/thrift/BackendService.thrift                |   3 +
 .../workload_manager_p0/test_curd_wlg.groovy       |  29 +++
 27 files changed, 693 insertions(+), 138 deletions(-)

diff --git a/be/src/agent/cgroup_cpu_ctl.cpp b/be/src/agent/cgroup_cpu_ctl.cpp
index e68535a708c..76b72f2c9d0 100644
--- a/be/src/agent/cgroup_cpu_ctl.cpp
+++ b/be/src/agent/cgroup_cpu_ctl.cpp
@@ -158,11 +158,11 @@ uint64_t CgroupCpuCtl::cpu_soft_limit_default_value() {
     return _is_enable_cgroup_v2_in_env ? 100 : 1024;
 }
 
-std::unique_ptr<CgroupCpuCtl> CgroupCpuCtl::create_cgroup_cpu_ctl(uint64_t 
wg_id) {
+std::shared_ptr<CgroupCpuCtl> CgroupCpuCtl::create_cgroup_cpu_ctl(uint64_t 
wg_id) {
     if (_is_enable_cgroup_v2_in_env) {
-        return std::make_unique<CgroupV2CpuCtl>(wg_id);
+        return std::make_shared<CgroupV2CpuCtl>(wg_id);
     } else if (_is_enable_cgroup_v1_in_env) {
-        return std::make_unique<CgroupV1CpuCtl>(wg_id);
+        return std::make_shared<CgroupV1CpuCtl>(wg_id);
     }
     return nullptr;
 }
diff --git a/be/src/agent/cgroup_cpu_ctl.h b/be/src/agent/cgroup_cpu_ctl.h
index 84e191159f1..b23f1f4dd9c 100644
--- a/be/src/agent/cgroup_cpu_ctl.h
+++ b/be/src/agent/cgroup_cpu_ctl.h
@@ -52,7 +52,7 @@ public:
 
     static Status delete_unused_cgroup_path(std::set<uint64_t>& used_wg_ids);
 
-    static std::unique_ptr<CgroupCpuCtl> create_cgroup_cpu_ctl(uint64_t wg_id);
+    static std::shared_ptr<CgroupCpuCtl> create_cgroup_cpu_ctl(uint64_t wg_id);
 
     static bool is_a_valid_cgroup_path(std::string cg_path);
 
diff --git a/be/src/agent/topic_subscriber.cpp 
b/be/src/agent/topic_subscriber.cpp
index f62bdaef099..b470e1534e1 100644
--- a/be/src/agent/topic_subscriber.cpp
+++ b/be/src/agent/topic_subscriber.cpp
@@ -40,14 +40,12 @@ void TopicSubscriber::handle_topic_info(const 
TPublishTopicRequest& topic_reques
     // eg, update workload info may delay other listener, then we need add a 
thread here
     // to handle_topic_info asynchronous
     std::shared_lock lock(_listener_mtx);
-    LOG(INFO) << "[topic_publish]begin handle topic info";
     for (auto& listener_pair : _registered_listeners) {
         if (topic_request.topic_map.find(listener_pair.first) != 
topic_request.topic_map.end()) {
-            LOG(INFO) << "[topic_publish]begin handle topic " << 
listener_pair.first
-                      << ", size=" << 
topic_request.topic_map.at(listener_pair.first).size();
             listener_pair.second->handle_topic_info(
                     topic_request.topic_map.at(listener_pair.first));
-            LOG(INFO) << "[topic_publish]finish handle topic " << 
listener_pair.first;
+            LOG(INFO) << "[topic_publish]finish handle topic " << 
listener_pair.first
+                      << ", size=" << 
topic_request.topic_map.at(listener_pair.first).size();
         }
     }
 }
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index ddd35ae2f92..f5f796d973b 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -125,7 +125,7 @@ static int32_t 
get_single_replica_compaction_threads_num(size_t data_dirs_num) {
     return threads_num;
 }
 
-Status StorageEngine::start_bg_threads() {
+Status StorageEngine::start_bg_threads(std::shared_ptr<WorkloadGroup> wg_sptr) 
{
     RETURN_IF_ERROR(Thread::create(
             "StorageEngine", "unused_rowset_monitor_thread",
             [this]() { this->_unused_rowset_monitor_thread_callback(); },
@@ -155,29 +155,60 @@ Status StorageEngine::start_bg_threads() {
     auto single_replica_compaction_threads =
             get_single_replica_compaction_threads_num(data_dirs.size());
 
-    RETURN_IF_ERROR(ThreadPoolBuilder("BaseCompactionTaskThreadPool")
-                            .set_min_threads(base_compaction_threads)
-                            .set_max_threads(base_compaction_threads)
-                            .build(&_base_compaction_thread_pool));
-    RETURN_IF_ERROR(ThreadPoolBuilder("CumuCompactionTaskThreadPool")
-                            .set_min_threads(cumu_compaction_threads)
-                            .set_max_threads(cumu_compaction_threads)
-                            .build(&_cumu_compaction_thread_pool));
-    RETURN_IF_ERROR(ThreadPoolBuilder("SingleReplicaCompactionTaskThreadPool")
-                            .set_min_threads(single_replica_compaction_threads)
-                            .set_max_threads(single_replica_compaction_threads)
-                            .build(&_single_replica_compaction_thread_pool));
-
-    if (config::enable_segcompaction) {
-        RETURN_IF_ERROR(ThreadPoolBuilder("SegCompactionTaskThreadPool")
-                                
.set_min_threads(config::segcompaction_num_threads)
-                                
.set_max_threads(config::segcompaction_num_threads)
-                                .build(&_seg_compaction_thread_pool));
+    if (wg_sptr && wg_sptr->get_cgroup_cpu_ctl_wptr().lock()) {
+        RETURN_IF_ERROR(ThreadPoolBuilder("gBaseCompactionTaskThreadPool")
+                                .set_min_threads(base_compaction_threads)
+                                .set_max_threads(base_compaction_threads)
+                                
.set_cgroup_cpu_ctl(wg_sptr->get_cgroup_cpu_ctl_wptr())
+                                .build(&_base_compaction_thread_pool));
+        RETURN_IF_ERROR(ThreadPoolBuilder("gCumuCompactionTaskThreadPool")
+                                .set_min_threads(cumu_compaction_threads)
+                                .set_max_threads(cumu_compaction_threads)
+                                
.set_cgroup_cpu_ctl(wg_sptr->get_cgroup_cpu_ctl_wptr())
+                                .build(&_cumu_compaction_thread_pool));
+        
RETURN_IF_ERROR(ThreadPoolBuilder("gSingleReplicaCompactionTaskThreadPool")
+                                
.set_min_threads(single_replica_compaction_threads)
+                                
.set_max_threads(single_replica_compaction_threads)
+                                
.set_cgroup_cpu_ctl(wg_sptr->get_cgroup_cpu_ctl_wptr())
+                                
.build(&_single_replica_compaction_thread_pool));
+
+        if (config::enable_segcompaction) {
+            RETURN_IF_ERROR(ThreadPoolBuilder("gSegCompactionTaskThreadPool")
+                                    
.set_min_threads(config::segcompaction_num_threads)
+                                    
.set_max_threads(config::segcompaction_num_threads)
+                                    
.set_cgroup_cpu_ctl(wg_sptr->get_cgroup_cpu_ctl_wptr())
+                                    .build(&_seg_compaction_thread_pool));
+        }
+        RETURN_IF_ERROR(ThreadPoolBuilder("gColdDataCompactionTaskThreadPool")
+                                
.set_min_threads(config::cold_data_compaction_thread_num)
+                                
.set_max_threads(config::cold_data_compaction_thread_num)
+                                
.set_cgroup_cpu_ctl(wg_sptr->get_cgroup_cpu_ctl_wptr())
+                                .build(&_cold_data_compaction_thread_pool));
+    } else {
+        RETURN_IF_ERROR(ThreadPoolBuilder("BaseCompactionTaskThreadPool")
+                                .set_min_threads(base_compaction_threads)
+                                .set_max_threads(base_compaction_threads)
+                                .build(&_base_compaction_thread_pool));
+        RETURN_IF_ERROR(ThreadPoolBuilder("CumuCompactionTaskThreadPool")
+                                .set_min_threads(cumu_compaction_threads)
+                                .set_max_threads(cumu_compaction_threads)
+                                .build(&_cumu_compaction_thread_pool));
+        
RETURN_IF_ERROR(ThreadPoolBuilder("SingleReplicaCompactionTaskThreadPool")
+                                
.set_min_threads(single_replica_compaction_threads)
+                                
.set_max_threads(single_replica_compaction_threads)
+                                
.build(&_single_replica_compaction_thread_pool));
+
+        if (config::enable_segcompaction) {
+            RETURN_IF_ERROR(ThreadPoolBuilder("SegCompactionTaskThreadPool")
+                                    
.set_min_threads(config::segcompaction_num_threads)
+                                    
.set_max_threads(config::segcompaction_num_threads)
+                                    .build(&_seg_compaction_thread_pool));
+        }
+        RETURN_IF_ERROR(ThreadPoolBuilder("ColdDataCompactionTaskThreadPool")
+                                
.set_min_threads(config::cold_data_compaction_thread_num)
+                                
.set_max_threads(config::cold_data_compaction_thread_num)
+                                .build(&_cold_data_compaction_thread_pool));
     }
-    RETURN_IF_ERROR(ThreadPoolBuilder("ColdDataCompactionTaskThreadPool")
-                            
.set_min_threads(config::cold_data_compaction_thread_num)
-                            
.set_max_threads(config::cold_data_compaction_thread_num)
-                            .build(&_cold_data_compaction_thread_pool));
 
     // compaction tasks producer thread
     RETURN_IF_ERROR(Thread::create(
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 560b246274b..fcd993dd1ba 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -71,6 +71,7 @@ class TxnManager;
 class ReportWorker;
 class CreateTabletIdxCache;
 struct DirInfo;
+class WorkloadGroup;
 
 using SegCompactionCandidates = std::vector<segment_v2::SegmentSharedPtr>;
 using SegCompactionCandidatesSharedPtr = 
std::shared_ptr<SegCompactionCandidates>;
@@ -171,7 +172,7 @@ public:
     }
 
     // start all background threads. This should be call after env is ready.
-    Status start_bg_threads();
+    Status start_bg_threads(std::shared_ptr<WorkloadGroup> wg_sptr = nullptr);
 
     // clear trash and snapshot file
     // option: update disk usage after sweep
diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h
index c33103bfd30..55aa31fc769 100644
--- a/be/src/pipeline/task_scheduler.h
+++ b/be/src/pipeline/task_scheduler.h
@@ -76,7 +76,7 @@ class TaskScheduler {
 public:
     TaskScheduler(ExecEnv* exec_env, std::shared_ptr<BlockedTaskScheduler> 
b_scheduler,
                   std::shared_ptr<TaskQueue> task_queue, std::string name,
-                  CgroupCpuCtl* cgroup_cpu_ctl)
+                  std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl)
             : _task_queue(std::move(task_queue)),
               _blocked_task_scheduler(std::move(b_scheduler)),
               _shutdown(false),
@@ -102,7 +102,7 @@ private:
     std::shared_ptr<BlockedTaskScheduler> _blocked_task_scheduler;
     std::atomic<bool> _shutdown;
     std::string _name;
-    CgroupCpuCtl* _cgroup_cpu_ctl = nullptr;
+    std::weak_ptr<CgroupCpuCtl> _cgroup_cpu_ctl;
 
     void _do_work(size_t index);
 };
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index e47e26e8f6b..5f54395a66f 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -222,6 +222,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths,
     _pipeline_tracer_ctx = 
std::make_unique<pipeline::PipelineTracerContext>(); // before query
     RETURN_IF_ERROR(init_pipeline_task_scheduler());
     _workload_group_manager = new WorkloadGroupMgr();
+    _workload_group_manager->init_internal_workload_group();
     _scanner_scheduler = new doris::vectorized::ScannerScheduler();
     _fragment_mgr = new FragmentMgr(this);
     _result_cache = new ResultCache(config::query_cache_max_size_mb,
@@ -295,7 +296,8 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths,
         return st;
     }
     _storage_engine->set_heartbeat_flags(this->heartbeat_flags());
-    if (st = _storage_engine->start_bg_threads(); !st.ok()) {
+    WorkloadGroupPtr internal_wg = _workload_group_manager->get_internal_wg();
+    if (st = _storage_engine->start_bg_threads(internal_wg); !st.ok()) {
         LOG(ERROR) << "Failed to starge bg threads of storage engine, res=" << 
st;
         return st;
     }
diff --git a/be/src/runtime/workload_group/workload_group.cpp 
b/be/src/runtime/workload_group/workload_group.cpp
index 971750eb097..07d4177f7f6 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -50,7 +50,9 @@ const static int CPU_HARD_LIMIT_DEFAULT_VALUE = -1;
 const static int SPILL_LOW_WATERMARK_DEFAULT_VALUE = 50;
 const static int SPILL_HIGH_WATERMARK_DEFAULT_VALUE = 80;
 
-WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& tg_info)
+WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& wg_info) : 
WorkloadGroup(wg_info, true) {}
+
+WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& tg_info, bool 
need_create_query_thread_pool)
         : _id(tg_info.id),
           _name(tg_info.name),
           _version(tg_info.version),
@@ -65,7 +67,8 @@ WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& tg_info)
           _spill_low_watermark(tg_info.spill_low_watermark),
           _spill_high_watermark(tg_info.spill_high_watermark),
           _scan_bytes_per_second(tg_info.read_bytes_per_second),
-          _remote_scan_bytes_per_second(tg_info.remote_read_bytes_per_second) {
+          _remote_scan_bytes_per_second(tg_info.remote_read_bytes_per_second),
+          _need_create_query_thread_pool(need_create_query_thread_pool) {
     std::vector<DataDirInfo>& data_dir_list = 
io::BeConfDataDirReader::be_config_data_dir_list;
     for (const auto& data_dir : data_dir_list) {
         _scan_io_throttle_map[data_dir.path] =
@@ -419,35 +422,42 @@ Status WorkloadGroupInfo::parse_topic_info(const 
TWorkloadGroupInfo& tworkload_g
     return Status::OK();
 }
 
-void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* 
exec_env) {
-    uint64_t tg_id = tg_info->id;
-    std::string tg_name = tg_info->name;
-    int cpu_hard_limit = tg_info->cpu_hard_limit;
-    uint64_t cpu_shares = tg_info->cpu_share;
-    bool enable_cpu_hard_limit = tg_info->enable_cpu_hard_limit;
-    int scan_thread_num = tg_info->scan_thread_num;
-    int max_remote_scan_thread_num = tg_info->max_remote_scan_thread_num;
-    int min_remote_scan_thread_num = tg_info->min_remote_scan_thread_num;
+std::weak_ptr<CgroupCpuCtl> WorkloadGroup::get_cgroup_cpu_ctl_wptr() {
+    std::shared_lock<std::shared_mutex> rlock(_task_sched_lock);
+    return _cgroup_cpu_ctl;
+}
 
+void WorkloadGroup::create_cgroup_cpu_ctl() {
     std::lock_guard<std::shared_mutex> wlock(_task_sched_lock);
+    create_cgroup_cpu_ctl_no_lock();
+}
+
+void WorkloadGroup::create_cgroup_cpu_ctl_no_lock() {
     if (config::doris_cgroup_cpu_path != "" && _cgroup_cpu_ctl == nullptr) {
-        std::unique_ptr<CgroupCpuCtl> cgroup_cpu_ctl = 
CgroupCpuCtl::create_cgroup_cpu_ctl(tg_id);
+        std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl = 
CgroupCpuCtl::create_cgroup_cpu_ctl(_id);
         if (cgroup_cpu_ctl) {
             Status ret = cgroup_cpu_ctl->init();
             if (ret.ok()) {
                 _cgroup_cpu_ctl = std::move(cgroup_cpu_ctl);
-                LOG(INFO) << "[upsert wg thread pool] cgroup init success, 
wg_id=" << tg_id;
+                LOG(INFO) << "[upsert wg thread pool] cgroup init success, 
wg_id=" << _id;
             } else {
-                LOG(INFO) << "[upsert wg thread pool] cgroup init failed, 
wg_id=" << tg_id
+                LOG(INFO) << "[upsert wg thread pool] cgroup init failed, 
wg_id=" << _id
                           << ", reason=" << ret.to_string();
             }
         } else {
-            LOG(INFO) << "[upsert wg thread pool] create cgroup cpu ctl for " 
<< tg_id << " failed";
+            LOG(INFO) << "[upsert wg thread pool] create cgroup cpu ctl 
wg_id=" << _id << " failed";
         }
     }
+}
 
-    CgroupCpuCtl* cg_cpu_ctl_ptr = _cgroup_cpu_ctl.get();
-
+void WorkloadGroup::upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info,
+                                               std::shared_ptr<CgroupCpuCtl> 
cg_cpu_ctl_ptr,
+                                               ExecEnv* exec_env) {
+    uint64_t wg_id = wg_info->id;
+    std::string wg_name = wg_info->name;
+    int scan_thread_num = wg_info->scan_thread_num;
+    int max_remote_scan_thread_num = wg_info->max_remote_scan_thread_num;
+    int min_remote_scan_thread_num = wg_info->min_remote_scan_thread_num;
     if (_task_sched == nullptr) {
         int32_t executors_size = config::pipeline_executor_size;
         if (executors_size <= 0) {
@@ -457,18 +467,18 @@ void 
WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e
         std::unique_ptr<pipeline::TaskScheduler> pipeline_task_scheduler =
                 std::make_unique<pipeline::TaskScheduler>(
                         exec_env, exec_env->get_global_block_scheduler(), 
std::move(task_queue),
-                        "Pipe_" + tg_name, cg_cpu_ctl_ptr);
+                        "Pipe_" + wg_name, cg_cpu_ctl_ptr);
         Status ret = pipeline_task_scheduler->start();
         if (ret.ok()) {
             _task_sched = std::move(pipeline_task_scheduler);
         } else {
-            LOG(INFO) << "[upsert wg thread pool] task scheduler start failed, 
gid= " << tg_id;
+            LOG(INFO) << "[upsert wg thread pool] task scheduler start failed, 
gid= " << wg_id;
         }
     }
 
     if (_scan_task_sched == nullptr) {
         std::unique_ptr<vectorized::SimplifiedScanScheduler> scan_scheduler =
-                std::make_unique<vectorized::SimplifiedScanScheduler>("Scan_" 
+ tg_name,
+                std::make_unique<vectorized::SimplifiedScanScheduler>("Scan_" 
+ wg_name,
                                                                       
cg_cpu_ctl_ptr);
         Status ret = 
scan_scheduler->start(config::doris_scanner_thread_pool_thread_num,
                                            
config::doris_scanner_thread_pool_thread_num,
@@ -476,34 +486,33 @@ void 
WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e
         if (ret.ok()) {
             _scan_task_sched = std::move(scan_scheduler);
         } else {
-            LOG(INFO) << "[upsert wg thread pool] scan scheduler start failed, 
gid=" << tg_id;
+            LOG(INFO) << "[upsert wg thread pool] scan scheduler start failed, 
gid=" << wg_id;
         }
     }
     if (scan_thread_num > 0 && _scan_task_sched) {
         _scan_task_sched->reset_thread_num(scan_thread_num, scan_thread_num);
     }
-
     if (_remote_scan_task_sched == nullptr) {
         int remote_max_thread_num = 
vectorized::ScannerScheduler::get_remote_scan_thread_num();
         int remote_scan_thread_queue_size =
                 
vectorized::ScannerScheduler::get_remote_scan_thread_queue_size();
         std::unique_ptr<vectorized::SimplifiedScanScheduler> 
remote_scan_scheduler =
-                std::make_unique<vectorized::SimplifiedScanScheduler>("RScan_" 
+ tg_name,
+                std::make_unique<vectorized::SimplifiedScanScheduler>("RScan_" 
+ wg_name,
                                                                       
cg_cpu_ctl_ptr);
-        Status ret = remote_scan_scheduler->start(remote_max_thread_num, 
remote_max_thread_num,
+        Status ret = remote_scan_scheduler->start(remote_max_thread_num,
+                                                  
config::doris_scanner_min_thread_pool_thread_num,
                                                   
remote_scan_thread_queue_size);
         if (ret.ok()) {
             _remote_scan_task_sched = std::move(remote_scan_scheduler);
         } else {
             LOG(INFO) << "[upsert wg thread pool] remote scan scheduler start 
failed, gid="
-                      << tg_id;
+                      << wg_id;
         }
     }
     if (max_remote_scan_thread_num >= min_remote_scan_thread_num && 
_remote_scan_task_sched) {
         _remote_scan_task_sched->reset_thread_num(max_remote_scan_thread_num,
                                                   min_remote_scan_thread_num);
     }
-
     if (_memtable_flush_pool == nullptr) {
         int num_disk = 
ExecEnv::GetInstance()->get_storage_engine()->get_disk_num();
         // -1 means disk num may not be inited, so not create flush pool
@@ -512,7 +521,7 @@ void 
WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e
 
             size_t min_threads = std::max(1, 
config::wg_flush_thread_num_per_store);
             size_t max_threads = num_disk * min_threads;
-            std::string pool_name = "wg_flush_" + tg_name;
+            std::string pool_name = "wg_flush_" + wg_name;
             auto ret = ThreadPoolBuilder(pool_name)
                                .set_min_threads(min_threads)
                                .set_max_threads(max_threads)
@@ -520,17 +529,24 @@ void 
WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e
                                .build(&thread_pool);
             if (!ret.ok()) {
                 LOG(INFO) << "[upsert wg thread pool] create " + pool_name + " 
failed, gid="
-                          << tg_id;
+                          << wg_id;
             } else {
                 _memtable_flush_pool = std::move(thread_pool);
-                LOG(INFO) << "[upsert wg thread pool] create " + pool_name + " 
succ, gid=" << tg_id
+                LOG(INFO) << "[upsert wg thread pool] create " + pool_name + " 
succ, gid=" << wg_id
                           << ", max thread num=" << max_threads
                           << ", min thread num=" << min_threads;
             }
         }
     }
+}
+
+void WorkloadGroup::upsert_cgroup_cpu_ctl_no_lock(WorkloadGroupInfo* wg_info) {
+    uint64_t wg_id = wg_info->id;
+    int cpu_hard_limit = wg_info->cpu_hard_limit;
+    uint64_t cpu_shares = wg_info->cpu_share;
+    bool enable_cpu_hard_limit = wg_info->enable_cpu_hard_limit;
+    create_cgroup_cpu_ctl_no_lock();
 
-    // step 6: update cgroup cpu if needed
     if (_cgroup_cpu_ctl) {
         if (enable_cpu_hard_limit) {
             if (cpu_hard_limit > 0) {
@@ -538,16 +554,26 @@ void 
WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e
                 _cgroup_cpu_ctl->update_cpu_soft_limit(
                         CgroupCpuCtl::cpu_soft_limit_default_value());
             } else {
-                LOG(INFO) << "[upsert wg thread pool] enable cpu hard limit 
but value is illegal: "
-                          << cpu_hard_limit << ", gid=" << tg_id;
+                LOG(INFO) << "[upsert wg thread pool] enable cpu hard limit 
but value is "
+                             "illegal: "
+                          << cpu_hard_limit << ", gid=" << wg_id;
             }
         } else {
             _cgroup_cpu_ctl->update_cpu_soft_limit(cpu_shares);
             _cgroup_cpu_ctl->update_cpu_hard_limit(
                     CPU_HARD_LIMIT_DEFAULT_VALUE); // disable cpu hard limit
         }
-        _cgroup_cpu_ctl->get_cgroup_cpu_info(&(tg_info->cgroup_cpu_shares),
-                                             
&(tg_info->cgroup_cpu_hard_limit));
+        _cgroup_cpu_ctl->get_cgroup_cpu_info(&(wg_info->cgroup_cpu_shares),
+                                             
&(wg_info->cgroup_cpu_hard_limit));
+    }
+}
+
+void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* wg_info, ExecEnv* 
exec_env) {
+    std::lock_guard<std::shared_mutex> wlock(_task_sched_lock);
+    upsert_cgroup_cpu_ctl_no_lock(wg_info);
+
+    if (_need_create_query_thread_pool) {
+        upsert_thread_pool_no_lock(wg_info, _cgroup_cpu_ctl, exec_env);
     }
 }
 
diff --git a/be/src/runtime/workload_group/workload_group.h 
b/be/src/runtime/workload_group/workload_group.h
index 5d6b201eaab..22b1405eeab 100644
--- a/be/src/runtime/workload_group/workload_group.h
+++ b/be/src/runtime/workload_group/workload_group.h
@@ -59,6 +59,8 @@ class WorkloadGroup : public 
std::enable_shared_from_this<WorkloadGroup> {
 public:
     explicit WorkloadGroup(const WorkloadGroupInfo& tg_info);
 
+    explicit WorkloadGroup(const WorkloadGroupInfo& tg_info, bool 
need_create_query_thread_pool);
+
     int64_t version() const { return _version; }
 
     uint64_t cpu_share() const { return _cpu_share.load(); }
@@ -210,7 +212,17 @@ public:
     }
     int64_t get_remote_scan_bytes_per_second();
 
+    void create_cgroup_cpu_ctl();
+
+    std::weak_ptr<CgroupCpuCtl> get_cgroup_cpu_ctl_wptr();
+
 private:
+    void create_cgroup_cpu_ctl_no_lock();
+    void upsert_cgroup_cpu_ctl_no_lock(WorkloadGroupInfo* wg_info);
+    void upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info,
+                                    std::shared_ptr<CgroupCpuCtl> 
cg_cpu_ctl_ptr,
+                                    ExecEnv* exec_env);
+
     mutable std::shared_mutex _mutex; // lock _name, _version, _cpu_share, 
_memory_limit
     const uint64_t _id;
     std::string _name;
@@ -241,7 +253,10 @@ private:
     std::unordered_map<TUniqueId, std::weak_ptr<QueryContext>> _query_ctxs;
 
     std::shared_mutex _task_sched_lock;
-    std::unique_ptr<CgroupCpuCtl> _cgroup_cpu_ctl {nullptr};
+    // _cgroup_cpu_ctl not only used by threadpool which managed by 
WorkloadGroup,
+    // but also some global background threadpool which not owned by 
WorkloadGroup,
+    // so it should be shared ptr;
+    std::shared_ptr<CgroupCpuCtl> _cgroup_cpu_ctl {nullptr};
     std::unique_ptr<doris::pipeline::TaskScheduler> _task_sched {nullptr};
     std::unique_ptr<vectorized::SimplifiedScanScheduler> _scan_task_sched 
{nullptr};
     std::unique_ptr<vectorized::SimplifiedScanScheduler> 
_remote_scan_task_sched {nullptr};
@@ -250,6 +265,9 @@ private:
     std::map<std::string, std::shared_ptr<IOThrottle>> _scan_io_throttle_map;
     std::shared_ptr<IOThrottle> _remote_scan_io_throttle {nullptr};
 
+    // for some background workload, it doesn't need to create query thread 
pool
+    const bool _need_create_query_thread_pool;
+
     // bvar metric
     std::unique_ptr<bvar::Status<int64_t>> _mem_used_status;
     std::unique_ptr<bvar::Adder<uint64_t>> _cpu_usage_adder;
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp 
b/be/src/runtime/workload_group/workload_group_manager.cpp
index 145754dd357..bb9757c94c3 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -33,6 +33,25 @@
 
 namespace doris {
 
+void WorkloadGroupMgr::init_internal_workload_group() {
+    WorkloadGroupPtr internal_wg = nullptr;
+    {
+        std::lock_guard<std::shared_mutex> w_lock(_group_mutex);
+        if (_workload_groups.find(INTERNAL_WORKLOAD_GROUP_ID) == 
_workload_groups.end()) {
+            WorkloadGroupInfo internal_wg_info {
+                    .id = INTERNAL_WORKLOAD_GROUP_ID,
+                    .name = INTERNAL_WORKLOAD_GROUP_NAME,
+                    .cpu_share = CgroupCpuCtl::cpu_soft_limit_default_value()};
+            internal_wg = std::make_shared<WorkloadGroup>(internal_wg_info, 
false);
+            _workload_groups[internal_wg_info.id] = internal_wg;
+        }
+    }
+    DCHECK(internal_wg != nullptr);
+    if (internal_wg) {
+        internal_wg->create_cgroup_cpu_ctl();
+    }
+}
+
 WorkloadGroupPtr WorkloadGroupMgr::get_or_create_workload_group(
         const WorkloadGroupInfo& workload_group_info) {
     {
@@ -85,6 +104,10 @@ void 
WorkloadGroupMgr::delete_workload_group_by_ids(std::set<uint64_t> used_wg_i
         old_wg_size = _workload_groups.size();
         for (auto iter = _workload_groups.begin(); iter != 
_workload_groups.end(); iter++) {
             uint64_t wg_id = iter->first;
+            // internal workload group created by BE can not be dropped
+            if (wg_id == INTERNAL_WORKLOAD_GROUP_ID) {
+                continue;
+            }
             auto workload_group_ptr = iter->second;
             if (used_wg_id.find(wg_id) == used_wg_id.end()) {
                 workload_group_ptr->shutdown();
diff --git a/be/src/runtime/workload_group/workload_group_manager.h 
b/be/src/runtime/workload_group/workload_group_manager.h
index d8547c3383e..52624f05fdf 100644
--- a/be/src/runtime/workload_group/workload_group_manager.h
+++ b/be/src/runtime/workload_group/workload_group_manager.h
@@ -36,11 +36,18 @@ class TaskScheduler;
 class MultiCoreTaskQueue;
 } // namespace pipeline
 
+// internal_group is used for doris internal workload, currently is mainly 
compaction
+const static uint64_t INTERNAL_WORKLOAD_GROUP_ID =
+        static_cast<uint64_t>(TWorkloadType::type::INTERNAL);
+const static std::string INTERNAL_WORKLOAD_GROUP_NAME = "_internal";
+
 class WorkloadGroupMgr {
 public:
     WorkloadGroupMgr() = default;
     ~WorkloadGroupMgr() = default;
 
+    void init_internal_workload_group();
+
     WorkloadGroupPtr get_or_create_workload_group(const WorkloadGroupInfo& 
workload_group_info);
 
     void get_related_workload_groups(const std::function<bool(const 
WorkloadGroupPtr& ptr)>& pred,
@@ -62,6 +69,11 @@ public:
 
     void get_wg_resource_usage(vectorized::Block* block);
 
+    WorkloadGroupPtr get_internal_wg() {
+        std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
+        return _workload_groups[INTERNAL_WORKLOAD_GROUP_ID];
+    }
+
 private:
     std::shared_mutex _group_mutex;
     std::unordered_map<uint64_t, WorkloadGroupPtr> _workload_groups;
diff --git a/be/src/util/threadpool.cpp b/be/src/util/threadpool.cpp
index 15fb36181d4..f5ea38515de 100644
--- a/be/src/util/threadpool.cpp
+++ b/be/src/util/threadpool.cpp
@@ -75,7 +75,8 @@ ThreadPoolBuilder& ThreadPoolBuilder::set_max_queue_size(int 
max_queue_size) {
     return *this;
 }
 
-ThreadPoolBuilder& ThreadPoolBuilder::set_cgroup_cpu_ctl(CgroupCpuCtl* 
cgroup_cpu_ctl) {
+ThreadPoolBuilder& ThreadPoolBuilder::set_cgroup_cpu_ctl(
+        std::weak_ptr<CgroupCpuCtl> cgroup_cpu_ctl) {
     _cgroup_cpu_ctl = cgroup_cpu_ctl;
     return *this;
 }
@@ -476,8 +477,8 @@ void ThreadPool::dispatch_thread() {
     _num_threads++;
     _num_threads_pending_start--;
 
-    if (_cgroup_cpu_ctl != nullptr) {
-        static_cast<void>(_cgroup_cpu_ctl->add_thread_to_cgroup());
+    if (std::shared_ptr<CgroupCpuCtl> cg_cpu_ctl_sptr = 
_cgroup_cpu_ctl.lock()) {
+        static_cast<void>(cg_cpu_ctl_sptr->add_thread_to_cgroup());
     }
 
     // Owned by this worker thread and added/removed from _idle_threads as 
needed.
diff --git a/be/src/util/threadpool.h b/be/src/util/threadpool.h
index 5ce27e2f27b..9bd4a7246fb 100644
--- a/be/src/util/threadpool.h
+++ b/be/src/util/threadpool.h
@@ -107,7 +107,7 @@ public:
     ThreadPoolBuilder& set_min_threads(int min_threads);
     ThreadPoolBuilder& set_max_threads(int max_threads);
     ThreadPoolBuilder& set_max_queue_size(int max_queue_size);
-    ThreadPoolBuilder& set_cgroup_cpu_ctl(CgroupCpuCtl* cgroup_cpu_ctl);
+    ThreadPoolBuilder& set_cgroup_cpu_ctl(std::weak_ptr<CgroupCpuCtl> 
cgroup_cpu_ctl);
     template <class Rep, class Period>
     ThreadPoolBuilder& set_idle_timeout(const std::chrono::duration<Rep, 
Period>& idle_timeout) {
         _idle_timeout = 
std::chrono::duration_cast<std::chrono::milliseconds>(idle_timeout);
@@ -133,7 +133,7 @@ private:
     int _min_threads;
     int _max_threads;
     int _max_queue_size;
-    CgroupCpuCtl* _cgroup_cpu_ctl = nullptr;
+    std::weak_ptr<CgroupCpuCtl> _cgroup_cpu_ctl;
     std::chrono::milliseconds _idle_timeout;
 
     ThreadPoolBuilder(const ThreadPoolBuilder&) = delete;
@@ -345,7 +345,7 @@ private:
     // Protected by _lock.
     int _total_queued_tasks;
 
-    CgroupCpuCtl* _cgroup_cpu_ctl = nullptr;
+    std::weak_ptr<CgroupCpuCtl> _cgroup_cpu_ctl;
 
     // All allocated tokens.
     //
diff --git a/be/src/vec/exec/scan/scanner_scheduler.h 
b/be/src/vec/exec/scan/scanner_scheduler.h
index 56c49368598..7731b3ba8f9 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.h
+++ b/be/src/vec/exec/scan/scanner_scheduler.h
@@ -114,11 +114,8 @@ struct SimplifiedScanTask {
 
 class SimplifiedScanScheduler {
 public:
-    SimplifiedScanScheduler(std::string sched_name, CgroupCpuCtl* 
cgroup_cpu_ctl) {
-        _is_stop.store(false);
-        _cgroup_cpu_ctl = cgroup_cpu_ctl;
-        _sched_name = sched_name;
-    }
+    SimplifiedScanScheduler(std::string sched_name, 
std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl)
+            : _is_stop(false), _cgroup_cpu_ctl(cgroup_cpu_ctl), 
_sched_name(sched_name) {}
 
     ~SimplifiedScanScheduler() {
         stop();
@@ -217,7 +214,7 @@ public:
 private:
     std::unique_ptr<ThreadPool> _scan_thread_pool;
     std::atomic<bool> _is_stop;
-    CgroupCpuCtl* _cgroup_cpu_ctl = nullptr;
+    std::weak_ptr<CgroupCpuCtl> _cgroup_cpu_ctl;
     std::string _sched_name;
 };
 
diff --git a/be/src/vec/sink/writer/async_result_writer.cpp 
b/be/src/vec/sink/writer/async_result_writer.cpp
index c83c66f241c..2d49a0e8978 100644
--- a/be/src/vec/sink/writer/async_result_writer.cpp
+++ b/be/src/vec/sink/writer/async_result_writer.cpp
@@ -113,6 +113,18 @@ void AsyncResultWriter::process_block(RuntimeState* state, 
RuntimeProfile* profi
         force_close(status);
     }
 
+    if (state && state->get_query_ctx() && 
state->get_query_ctx()->workload_group()) {
+        if (auto cg_ctl_sptr =
+                    
state->get_query_ctx()->workload_group()->get_cgroup_cpu_ctl_wptr().lock()) {
+            Status ret = cg_ctl_sptr->add_thread_to_cgroup();
+            if (ret.ok()) {
+                std::string wg_tname =
+                        "asyc_wr_" + 
state->get_query_ctx()->workload_group()->name();
+                Thread::set_self_name(wg_tname);
+            }
+        }
+    }
+
     if (_writer_status.ok()) {
         while (true) {
             ThreadCpuStopWatch cpu_time_stop_watch;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterWorkloadGroupStmt.java
 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterWorkloadGroupStmt.java
index f6120338333..becca898b64 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterWorkloadGroupStmt.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterWorkloadGroupStmt.java
@@ -25,6 +25,10 @@ import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.PrintableMap;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.resource.workloadgroup.WorkloadGroup;
+import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr;
+
+import org.apache.commons.lang3.StringUtils;
 
 import java.util.Map;
 
@@ -55,14 +59,26 @@ public class AlterWorkloadGroupStmt extends DdlStmt {
         }
 
         if (properties == null || properties.isEmpty()) {
-            throw new AnalysisException("Resource group properties can't be 
null");
+            throw new AnalysisException("Workload Group properties can't be 
empty");
+        }
+
+        if (properties.containsKey(WorkloadGroup.INTERNAL_TYPE)) {
+            throw new AnalysisException(WorkloadGroup.INTERNAL_TYPE + " can 
not be create or modified ");
+        }
+
+        String tagStr = properties.get(WorkloadGroup.TAG);
+        if (!StringUtils.isEmpty(tagStr) && 
(WorkloadGroupMgr.DEFAULT_GROUP_NAME.equals(workloadGroupName)
+                || 
WorkloadGroupMgr.INTERNAL_GROUP_NAME.equals(workloadGroupName))) {
+            throw new AnalysisException(
+                    WorkloadGroupMgr.INTERNAL_GROUP_NAME + " and " + 
WorkloadGroupMgr.DEFAULT_GROUP_NAME
+                            + " group can not set tag");
         }
     }
 
     @Override
     public String toSql() {
         StringBuilder sb = new StringBuilder();
-        sb.append("ALTER RESOURCE GROUP 
'").append(workloadGroupName).append("' ");
+        sb.append("ALTER WORKLOAD GROUP 
'").append(workloadGroupName).append("' ");
         sb.append("PROPERTIES(").append(new PrintableMap<>(properties, " = ", 
true, false)).append(")");
         return sb.toString();
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateWorkloadGroupStmt.java
 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateWorkloadGroupStmt.java
index 92a60a94e55..4c0c675ea00 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateWorkloadGroupStmt.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateWorkloadGroupStmt.java
@@ -27,6 +27,9 @@ import org.apache.doris.common.util.PrintableMap;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.resource.workloadgroup.WorkloadGroup;
+import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr;
+
+import org.apache.commons.lang3.StringUtils;
 
 import java.util.Map;
 
@@ -68,12 +71,19 @@ public class CreateWorkloadGroupStmt extends DdlStmt {
         FeNameFormat.checkWorkloadGroupName(workloadGroupName);
 
         if (properties == null || properties.isEmpty()) {
-            throw new AnalysisException("Resource group properties can't be 
null");
+            throw new AnalysisException("Workload Group properties can't be 
empty");
+        }
+
+        if (properties.containsKey(WorkloadGroup.INTERNAL_TYPE)) {
+            throw new AnalysisException(WorkloadGroup.INTERNAL_TYPE + " can 
not be create or modified ");
         }
 
-        String wgTag = properties.get(WorkloadGroup.TAG);
-        if (wgTag != null) {
-            FeNameFormat.checkCommonName("workload group tag", wgTag);
+        String tagStr = properties.get(WorkloadGroup.TAG);
+        if (!StringUtils.isEmpty(tagStr) && 
(WorkloadGroupMgr.DEFAULT_GROUP_NAME.equals(workloadGroupName)
+                || 
WorkloadGroupMgr.INTERNAL_GROUP_NAME.equals(workloadGroupName))) {
+            throw new AnalysisException(
+                    WorkloadGroupMgr.INTERNAL_GROUP_NAME + " and " + 
WorkloadGroupMgr.DEFAULT_GROUP_NAME
+                            + " group can not set tag");
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/DropWorkloadGroupStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/DropWorkloadGroupStmt.java
index d7a1703771c..f5ffb6f2cd2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/DropWorkloadGroupStmt.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/DropWorkloadGroupStmt.java
@@ -20,7 +20,6 @@ package org.apache.doris.analysis;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
-import org.apache.doris.common.FeNameFormat;
 import org.apache.doris.common.UserException;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.qe.ConnectContext;
@@ -50,8 +49,6 @@ public class DropWorkloadGroupStmt extends DdlStmt {
         if 
(!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), 
PrivPredicate.ADMIN)) {
             
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, 
"ADMIN");
         }
-
-        FeNameFormat.checkWorkloadGroupName(workloadGroupName);
     }
 
     @Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 03679d64330..5ca530669dd 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -246,6 +246,7 @@ import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.qe.VariableMgr;
 import org.apache.doris.resource.AdmissionControl;
 import org.apache.doris.resource.Tag;
+import 
org.apache.doris.resource.workloadgroup.CreateInternalWorkloadGroupThread;
 import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr;
 import org.apache.doris.resource.workloadschedpolicy.WorkloadRuntimeStatusMgr;
 import org.apache.doris.resource.workloadschedpolicy.WorkloadSchedPolicyMgr;
@@ -1751,6 +1752,7 @@ public class Env {
         WorkloadSchedPolicyPublisher wpPublisher = new 
WorkloadSchedPolicyPublisher(this);
         topicPublisherThread.addToTopicPublisherList(wpPublisher);
         topicPublisherThread.start();
+        new CreateInternalWorkloadGroupThread().start();
 
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
index f137c4cab49..b50eeffbced 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
@@ -50,6 +50,9 @@ public class FeConstants {
     // set to false to disable internal schema db
     public static boolean enableInternalSchemaDb = true;
 
+    // for UT, create internal workload group thread can not start
+    public static boolean shouldCreateInternalWorkloadGroup = true;
+
     // default scheduler interval is 10 seconds
     public static int default_scheduler_interval_millisecond = 10000;
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/CreateInternalWorkloadGroupThread.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/CreateInternalWorkloadGroupThread.java
new file mode 100644
index 00000000000..7c6d0e3a080
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/CreateInternalWorkloadGroupThread.java
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.resource.workloadgroup;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.FeConstants;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class CreateInternalWorkloadGroupThread extends Thread {
+
+    private static final Logger LOG = 
LogManager.getLogger(CreateInternalWorkloadGroupThread.class);
+
+    public CreateInternalWorkloadGroupThread() {
+        super("CreateInternalWorkloadGroupThread");
+    }
+
+    public void run() {
+        if (!FeConstants.shouldCreateInternalWorkloadGroup) {
+            return;
+        }
+        try {
+            Env env = Env.getCurrentEnv();
+            while (!env.isReady()) {
+                Thread.sleep(5000);
+            }
+            if (!env.getWorkloadGroupMgr()
+                    
.isWorkloadGroupExists(WorkloadGroupMgr.INTERNAL_GROUP_NAME)) {
+                env.getWorkloadGroupMgr().createInternalWorkloadGroup();
+                LOG.info("create internal workload group succ");
+            } else {
+                LOG.info("internal workload group already exists.");
+            }
+        } catch (Throwable t) {
+            LOG.warn("create internal workload group failed. ", t);
+        }
+    }
+
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
index aa9bed42d7d..a026025c918 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
@@ -30,8 +30,10 @@ import org.apache.doris.persist.gson.GsonPostProcessable;
 import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.thrift.TPipelineWorkloadGroup;
 import org.apache.doris.thrift.TWorkloadGroupInfo;
+import org.apache.doris.thrift.TWorkloadType;
 import org.apache.doris.thrift.TopicInfo;
 
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.gson.annotations.SerializedName;
 import org.apache.commons.lang3.StringUtils;
@@ -43,8 +45,11 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
 
 public class WorkloadGroup implements Writable, GsonPostProcessable {
     private static final Logger LOG = 
LogManager.getLogger(WorkloadGroup.class);
@@ -79,6 +84,11 @@ public class WorkloadGroup implements Writable, 
GsonPostProcessable {
 
     public static final String REMOTE_READ_BYTES_PER_SECOND = 
"remote_read_bytes_per_second";
 
+    // it's used to define Doris's internal workload group,
+    // currently it is internal, only contains compaction
+    // later more type and workload may be included in the future.
+    public static final String INTERNAL_TYPE = "internal_type";
+
     // NOTE(wb): all property is not required, some properties default value 
is set in be
     // default value is as followed
     // cpu_share=1024, memory_limit=0%(0 means not limit), 
enable_memory_overcommit=true
@@ -87,7 +97,10 @@ public class WorkloadGroup implements Writable, 
GsonPostProcessable {
             
.add(MAX_QUEUE_SIZE).add(QUEUE_TIMEOUT).add(CPU_HARD_LIMIT).add(SCAN_THREAD_NUM)
             .add(MAX_REMOTE_SCAN_THREAD_NUM).add(MIN_REMOTE_SCAN_THREAD_NUM)
             
.add(SPILL_THRESHOLD_LOW_WATERMARK).add(SPILL_THRESHOLD_HIGH_WATERMARK)
-            
.add(TAG).add(READ_BYTES_PER_SECOND).add(REMOTE_READ_BYTES_PER_SECOND).build();
+            
.add(TAG).add(READ_BYTES_PER_SECOND).add(REMOTE_READ_BYTES_PER_SECOND).add(INTERNAL_TYPE).build();
+
+    public static final ImmutableMap<String, Integer> WORKLOAD_TYPE_MAP = new 
ImmutableMap.Builder<String, Integer>()
+            .put(TWorkloadType.INTERNAL.toString().toLowerCase(), 
TWorkloadType.INTERNAL.getValue()).build();
 
     public static final int SPILL_LOW_WATERMARK_DEFAULT_VALUE = 50;
     public static final int SPILL_HIGH_WATERMARK_DEFAULT_VALUE = 80;
@@ -386,18 +399,6 @@ public class WorkloadGroup implements Writable, 
GsonPostProcessable {
                     + SPILL_THRESHOLD_LOW_WATERMARK + "(" + lowWaterMark + 
")");
         }
 
-        String tagStr = properties.get(TAG);
-        if (!StringUtils.isEmpty(tagStr)) {
-            String[] tagArr = tagStr.split(",");
-            for (String tag : tagArr) {
-                try {
-                    FeNameFormat.checkCommonName("workload group tag name", 
tag);
-                } catch (AnalysisException e) {
-                    throw new DdlException("workload group tag name format is 
illegal, " + tagStr);
-                }
-            }
-        }
-
         if (properties.containsKey(READ_BYTES_PER_SECOND)) {
             String readBytesVal = properties.get(READ_BYTES_PER_SECOND);
             try {
@@ -427,6 +428,37 @@ public class WorkloadGroup implements Writable, 
GsonPostProcessable {
             }
         }
 
+        String tagStr = properties.get(TAG);
+        if (!StringUtils.isEmpty(tagStr)) {
+            String[] tagArr = tagStr.split(",");
+            for (String tag : tagArr) {
+                try {
+                    FeNameFormat.checkCommonName("workload group tag", tag);
+                } catch (AnalysisException e) {
+                    throw new DdlException("tag format is illegal, " + tagStr);
+                }
+            }
+        }
+
+        // internal workload group is usually created by Doris.
+        // If exception happens here, it means thrift not match 
WORKLOAD_TYPE_MAP.
+        String interTypeId = properties.get(WorkloadGroup.INTERNAL_TYPE);
+        if (!StringUtils.isEmpty(interTypeId)) {
+            int wid = Integer.valueOf(interTypeId);
+            if (TWorkloadType.findByValue(wid) == null) {
+                throw new DdlException("error internal type id: " + wid + ", 
current id map:" + WORKLOAD_TYPE_MAP);
+            }
+        }
+
+    }
+
+
+    Optional<Integer> getInternalTypeId() {
+        String typeIdStr = this.properties.get(INTERNAL_TYPE);
+        if (StringUtils.isEmpty(typeIdStr)) {
+            return Optional.empty();
+        }
+        return Optional.of(Integer.valueOf(typeIdStr));
     }
 
     public long getId() {
@@ -535,8 +567,18 @@ public class WorkloadGroup implements Writable, 
GsonPostProcessable {
         return cpuHardLimit;
     }
 
-    public String getTag() {
-        return properties.get(TAG);
+    public Optional<Set<String>> getTag() {
+        String tagStr = properties.get(TAG);
+        if (StringUtils.isEmpty(tagStr)) {
+            return Optional.empty();
+        }
+
+        Set<String> tagSet = new HashSet<>();
+        String[] ss = tagStr.split(",");
+        for (String str : ss) {
+            tagSet.add(str);
+        }
+        return Optional.of(tagSet);
     }
 
     @Override
@@ -550,7 +592,14 @@ public class WorkloadGroup implements Writable, 
GsonPostProcessable {
 
     public TopicInfo toTopicInfo() {
         TWorkloadGroupInfo tWorkloadGroupInfo = new TWorkloadGroupInfo();
-        tWorkloadGroupInfo.setId(id);
+
+        long wgId = this.id;
+        Optional<Integer> internalTypeId = getInternalTypeId();
+        if (internalTypeId.isPresent()) {
+            wgId = internalTypeId.get();
+        }
+        tWorkloadGroupInfo.setId(wgId);
+
         tWorkloadGroupInfo.setName(name);
         tWorkloadGroupInfo.setVersion(version);
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
index a7ffddbf74a..d21bcc5ace5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
@@ -42,6 +42,7 @@ import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.thrift.TPipelineWorkloadGroup;
 import org.apache.doris.thrift.TUserIdentity;
+import org.apache.doris.thrift.TWorkloadType;
 import org.apache.doris.thrift.TopicInfo;
 
 import com.google.common.base.Strings;
@@ -49,7 +50,6 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.gson.annotations.SerializedName;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -62,6 +62,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -71,6 +72,12 @@ public class WorkloadGroupMgr extends MasterDaemon 
implements Writable, GsonPost
 
     public static final Long DEFAULT_GROUP_ID = 1L;
 
+    public static final String INTERNAL_GROUP_NAME = "_internal";
+
+    // internal_type_id could be converted to workload group id when Workload 
published to BE
+    // refer WorkloadGroup.toTopicInfo
+    public static final Long INTERNAL_TYPE_ID = 
Long.valueOf(TWorkloadType.INTERNAL.getValue());
+
     public static final ImmutableList<String> 
WORKLOAD_GROUP_PROC_NODE_TITLE_NAMES = new ImmutableList.Builder<String>()
             
.add("Id").add("Name").add(WorkloadGroup.CPU_SHARE).add(WorkloadGroup.MEMORY_LIMIT)
             .add(WorkloadGroup.ENABLE_MEMORY_OVERCOMMIT)
@@ -367,44 +374,84 @@ public class WorkloadGroupMgr extends MasterDaemon 
implements Writable, GsonPost
         LOG.info("Create workload group success: {}", workloadGroup);
     }
 
+    public void createInternalWorkloadGroup() {
+        Map<String, String> properties = Maps.newHashMap();
+        // 100 is cgroup v2 default cpu_share value
+        properties.put(WorkloadGroup.CPU_SHARE, "100");
+        properties.put(WorkloadGroup.INTERNAL_TYPE, 
String.valueOf(INTERNAL_TYPE_ID));
+        WorkloadGroup wg = new WorkloadGroup(Env.getCurrentEnv().getNextId(), 
INTERNAL_GROUP_NAME, properties);
+        writeLock();
+        try {
+            if (!nameToWorkloadGroup.containsKey(wg.getName())) {
+                nameToWorkloadGroup.put(wg.getName(), wg);
+                idToWorkloadGroup.put(wg.getId(), wg);
+                Env.getCurrentEnv().getEditLog().logCreateWorkloadGroup(wg);
+            }
+        } finally {
+            writeUnlock();
+        }
+    }
+
     // NOTE: used for checking sum value of 100%  for cpu_hard_limit and 
memory_limit
     //  when create/alter workload group with same tag.
     //  when oldWg is null it means caller is an alter stmt.
     private void checkGlobalUnlock(WorkloadGroup newWg, WorkloadGroup oldWg) 
throws DdlException {
-        String wgTag = newWg.getTag();
-        double sumOfAllMemLimit = 0;
-        int sumOfAllCpuHardLimit = 0;
-        for (Map.Entry<Long, WorkloadGroup> entry : 
idToWorkloadGroup.entrySet()) {
-            WorkloadGroup wg = entry.getValue();
-            if (!StringUtils.equals(wgTag, wg.getTag())) {
-                continue;
-            }
+        Optional<Set<String>> newWgTag = newWg.getTag();
+        Set<String> newWgTagSet = null;
+        if (newWgTag.isPresent()) {
+            newWgTagSet = newWgTag.get();
+        } else {
+            newWgTagSet = new HashSet<>();
+            newWgTagSet.add(null);
+        }
 
-            if (oldWg != null && entry.getKey() == oldWg.getId()) {
-                continue;
-            }
+        for (String newWgOneTag : newWgTagSet) {
+            double sumOfAllMemLimit = 0;
+            int sumOfAllCpuHardLimit = 0;
 
-            if (wg.getCpuHardLimit() > 0) {
-                sumOfAllCpuHardLimit += wg.getCpuHardLimit();
-            }
-            if (wg.getMemoryLimitPercent() > 0) {
-                sumOfAllMemLimit += wg.getMemoryLimitPercent();
+            // 1 get sum value of all wg which has same tag without current wg
+            for (Map.Entry<Long, WorkloadGroup> entry : 
idToWorkloadGroup.entrySet()) {
+                WorkloadGroup wg = entry.getValue();
+                Optional<Set<String>> wgTag = wg.getTag();
+
+                if (oldWg != null && entry.getKey() == oldWg.getId()) {
+                    continue;
+                }
+
+                if (newWgOneTag == null) {
+                    if (wgTag.isPresent()) {
+                        continue;
+                    }
+                } else if (!wgTag.isPresent() || 
(!wgTag.get().contains(newWgOneTag))) {
+                    continue;
+                }
+
+                if (wg.getCpuHardLimit() > 0) {
+                    sumOfAllCpuHardLimit += wg.getCpuHardLimit();
+                }
+                if (wg.getMemoryLimitPercent() > 0) {
+                    sumOfAllMemLimit += wg.getMemoryLimitPercent();
+                }
             }
-        }
 
-        sumOfAllMemLimit += newWg.getMemoryLimitPercent();
-        sumOfAllCpuHardLimit += newWg.getCpuHardLimit();
+            // 2 sum current wg value
+            sumOfAllMemLimit += newWg.getMemoryLimitPercent();
+            sumOfAllCpuHardLimit += newWg.getCpuHardLimit();
 
-        if (sumOfAllMemLimit > 100.0 + 1e-6) {
-            throw new DdlException(
-                    "The sum of all workload group " + 
WorkloadGroup.MEMORY_LIMIT + " within tag " + wgTag
-                            + " cannot be greater than 100.0%.");
-        }
+            // 3 check total sum
+            if (sumOfAllMemLimit > 100.0 + 1e-6) {
+                throw new DdlException(
+                        "The sum of all workload group " + 
WorkloadGroup.MEMORY_LIMIT + " within tag " + (
+                                newWgTag.isPresent() ? newWgTag.get() : "")
+                                + " cannot be greater than 100.0%. current sum 
val:" + sumOfAllMemLimit);
+            }
 
-        if (sumOfAllCpuHardLimit > 100) {
-            throw new DdlException(
-                    "sum of all workload group " + 
WorkloadGroup.CPU_HARD_LIMIT + " within tag "
-                            + wgTag + " can not be greater than 100% ");
+            if (sumOfAllCpuHardLimit > 100) {
+                throw new DdlException(
+                        "sum of all workload group " + 
WorkloadGroup.CPU_HARD_LIMIT + " within tag " + (
+                                newWgTag.isPresent()
+                                        ? newWgTag.get() : "") + " can not be 
greater than 100% ");
+            }
         }
     }
 
@@ -438,8 +485,8 @@ public class WorkloadGroupMgr extends MasterDaemon 
implements Writable, GsonPost
 
     public void dropWorkloadGroup(DropWorkloadGroupStmt stmt) throws 
DdlException {
         String workloadGroupName = stmt.getWorkloadGroupName();
-        if (DEFAULT_GROUP_NAME.equals(workloadGroupName)) {
-            throw new DdlException("Dropping default workload group " + 
workloadGroupName + " is not allowed");
+        if (DEFAULT_GROUP_NAME.equals(workloadGroupName) || 
INTERNAL_GROUP_NAME.equals(workloadGroupName)) {
+            throw new DdlException("Dropping workload group " + 
workloadGroupName + " is not allowed");
         }
 
         // if a workload group exists in user property, it should not be 
dropped
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java
index 5f1e3565966..d729881358e 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java
@@ -235,4 +235,226 @@ public class WorkloadGroupMgrTest {
         }
         Assert.assertTrue(tWorkloadGroup1.getWorkloadGroupInfo().getCpuShare() 
== 5);
     }
+
+    @Test
+    public void testMultiTagCreateWorkloadGroup() throws UserException {
+        Config.enable_workload_group = true;
+        WorkloadGroupMgr workloadGroupMgr = new WorkloadGroupMgr();
+
+            {
+                String name = "empty_g1";
+                Map<String, String> properties = Maps.newHashMap();
+                properties.put(WorkloadGroup.MEMORY_LIMIT, "50%");
+                properties.put(WorkloadGroup.TAG, "");
+                CreateWorkloadGroupStmt createStmt = new 
CreateWorkloadGroupStmt(false, name, properties);
+                workloadGroupMgr.createWorkloadGroup(createStmt);
+            }
+
+            {
+                String name = "empty_g2";
+                Map<String, String> properties = Maps.newHashMap();
+                properties.put(WorkloadGroup.MEMORY_LIMIT, "10%");
+                CreateWorkloadGroupStmt createStmt = new 
CreateWorkloadGroupStmt(false, name, properties);
+                workloadGroupMgr.createWorkloadGroup(createStmt);
+            }
+
+            {
+                String name = "not_empty_g1";
+                Map<String, String> properties = Maps.newHashMap();
+                properties.put(WorkloadGroup.MEMORY_LIMIT, "30%");
+                properties.put(WorkloadGroup.TAG, "cn1,cn2");
+                CreateWorkloadGroupStmt createStmt = new 
CreateWorkloadGroupStmt(false, name, properties);
+                workloadGroupMgr.createWorkloadGroup(createStmt);
+            }
+
+            {
+                String name = "not_empty_g2";
+                Map<String, String> properties = Maps.newHashMap();
+                properties.put(WorkloadGroup.MEMORY_LIMIT, "30%");
+                properties.put(WorkloadGroup.TAG, "cn3,cn2");
+                CreateWorkloadGroupStmt createStmt = new 
CreateWorkloadGroupStmt(false, name, properties);
+                workloadGroupMgr.createWorkloadGroup(createStmt);
+            }
+
+
+            {
+                String name = "not_empty_g3";
+                Map<String, String> properties = Maps.newHashMap();
+                properties.put(WorkloadGroup.MEMORY_LIMIT, "70%");
+                properties.put(WorkloadGroup.TAG, "cn2,cn100");
+                try {
+                    CreateWorkloadGroupStmt createStmt = new 
CreateWorkloadGroupStmt(false, name, properties);
+                    workloadGroupMgr.createWorkloadGroup(createStmt);
+                } catch (DdlException e) {
+                    Assert.assertTrue(e.getMessage().contains("The sum of all 
workload group " + WorkloadGroup.MEMORY_LIMIT));
+                }
+            }
+
+            {
+                String name = "not_empty_g3";
+                Map<String, String> properties = Maps.newHashMap();
+                properties.put(WorkloadGroup.MEMORY_LIMIT, "70%");
+                properties.put(WorkloadGroup.TAG, "cn3,cn100");
+                CreateWorkloadGroupStmt createStmt = new 
CreateWorkloadGroupStmt(false, name, properties);
+                workloadGroupMgr.createWorkloadGroup(createStmt);
+            }
+
+            {
+                String name = "not_empty_g5";
+                Map<String, String> properties = Maps.newHashMap();
+                properties.put(WorkloadGroup.MEMORY_LIMIT, "70%");
+                properties.put(WorkloadGroup.TAG, "cn5");
+                CreateWorkloadGroupStmt createStmt = new 
CreateWorkloadGroupStmt(false, name, properties);
+                workloadGroupMgr.createWorkloadGroup(createStmt);
+            }
+
+            {
+                String name = "not_empty_g6";
+                Map<String, String> properties = Maps.newHashMap();
+                properties.put(WorkloadGroup.MEMORY_LIMIT, "30%");
+                properties.put(WorkloadGroup.TAG, "cn5");
+                CreateWorkloadGroupStmt createStmt = new 
CreateWorkloadGroupStmt(false, name, properties);
+                workloadGroupMgr.createWorkloadGroup(createStmt);
+            }
+
+            {
+                String name = "not_empty_g7";
+                Map<String, String> properties = Maps.newHashMap();
+                properties.put(WorkloadGroup.MEMORY_LIMIT, "70%");
+                properties.put(WorkloadGroup.TAG, "cn5");
+                try {
+                    CreateWorkloadGroupStmt createStmt = new 
CreateWorkloadGroupStmt(false, name, properties);
+                    workloadGroupMgr.createWorkloadGroup(createStmt);
+                } catch (DdlException e) {
+                    Assert.assertTrue(e.getMessage().contains("The sum of all 
workload group " + WorkloadGroup.MEMORY_LIMIT));
+                }
+            }
+
+    }
+
+
+    @Test
+    public void testMultiTagAlterWorkloadGroup() throws UserException {
+        Config.enable_workload_group = true;
+        WorkloadGroupMgr workloadGroupMgr = new WorkloadGroupMgr();
+            {
+                String name = "empty_g1";
+                Map<String, String> properties = Maps.newHashMap();
+                properties.put(WorkloadGroup.MEMORY_LIMIT, "50%");
+                properties.put(WorkloadGroup.TAG, "");
+                CreateWorkloadGroupStmt createStmt = new 
CreateWorkloadGroupStmt(false, name, properties);
+                workloadGroupMgr.createWorkloadGroup(createStmt);
+            }
+
+            {
+                String name = "empty_g2";
+                Map<String, String> properties = Maps.newHashMap();
+                properties.put(WorkloadGroup.MEMORY_LIMIT, "10%");
+                CreateWorkloadGroupStmt createStmt = new 
CreateWorkloadGroupStmt(false, name, properties);
+                workloadGroupMgr.createWorkloadGroup(createStmt);
+            }
+
+            {
+                String name = "not_empty_g1";
+                Map<String, String> properties = Maps.newHashMap();
+                properties.put(WorkloadGroup.MEMORY_LIMIT, "30%");
+                properties.put(WorkloadGroup.TAG, "cn1,cn2");
+                CreateWorkloadGroupStmt createStmt = new 
CreateWorkloadGroupStmt(false, name, properties);
+                workloadGroupMgr.createWorkloadGroup(createStmt);
+            }
+
+            {
+                String name = "not_empty_g2";
+                Map<String, String> properties = Maps.newHashMap();
+                properties.put(WorkloadGroup.MEMORY_LIMIT, "30%");
+                properties.put(WorkloadGroup.TAG, "cn3,cn2");
+                CreateWorkloadGroupStmt createStmt = new 
CreateWorkloadGroupStmt(false, name, properties);
+                workloadGroupMgr.createWorkloadGroup(createStmt);
+            }
+
+            {
+                String name = "not_empty_g3";
+                Map<String, String> properties = Maps.newHashMap();
+                properties.put(WorkloadGroup.MEMORY_LIMIT, "30%");
+                properties.put(WorkloadGroup.TAG, "cn2,cn100");
+                CreateWorkloadGroupStmt createStmt = new 
CreateWorkloadGroupStmt(false, name, properties);
+                workloadGroupMgr.createWorkloadGroup(createStmt);
+            }
+
+            {
+                String name = "not_empty_g3";
+                Map<String, String> properties = Maps.newHashMap();
+                properties.put(WorkloadGroup.MEMORY_LIMIT, "70%");
+                properties.put(WorkloadGroup.TAG, "cn2,cn100");
+                AlterWorkloadGroupStmt alterStmt = new 
AlterWorkloadGroupStmt(name, properties);
+                try {
+                    workloadGroupMgr.alterWorkloadGroup(alterStmt);
+                } catch (DdlException e) {
+                    Assert.assertTrue(e.getMessage().contains("The sum of all 
workload group " + WorkloadGroup.MEMORY_LIMIT));
+                }
+            }
+    }
+
+
+    @Test
+    public void testMultiTagCreateWorkloadGroupWithNoTag() throws 
UserException {
+        Config.enable_workload_group = true;
+        WorkloadGroupMgr workloadGroupMgr = new WorkloadGroupMgr();
+
+            {
+                String name = "not_empty_g1";
+                Map<String, String> properties = Maps.newHashMap();
+                properties.put(WorkloadGroup.MEMORY_LIMIT, "30%");
+                properties.put(WorkloadGroup.TAG, "cn1,cn2");
+                CreateWorkloadGroupStmt createStmt = new 
CreateWorkloadGroupStmt(false, name, properties);
+                workloadGroupMgr.createWorkloadGroup(createStmt);
+            }
+
+            {
+                String name = "not_empty_g2";
+                Map<String, String> properties = Maps.newHashMap();
+                properties.put(WorkloadGroup.MEMORY_LIMIT, "30%");
+                properties.put(WorkloadGroup.TAG, "cn3,cn2");
+                CreateWorkloadGroupStmt createStmt = new 
CreateWorkloadGroupStmt(false, name, properties);
+                workloadGroupMgr.createWorkloadGroup(createStmt);
+            }
+
+            // create not tag workload group
+            {
+                String name = "no_tag_g1";
+                Map<String, String> properties = Maps.newHashMap();
+                properties.put(WorkloadGroup.MEMORY_LIMIT, "10%");
+                properties.put(WorkloadGroup.TAG, "");
+                CreateWorkloadGroupStmt createStmt = new 
CreateWorkloadGroupStmt(false, name, properties);
+                workloadGroupMgr.createWorkloadGroup(createStmt);
+            }
+
+            {
+                String name = "no_tag_g2";
+                Map<String, String> properties = Maps.newHashMap();
+                properties.put(WorkloadGroup.MEMORY_LIMIT, "30%");
+                CreateWorkloadGroupStmt createStmt = new 
CreateWorkloadGroupStmt(false, name, properties);
+                workloadGroupMgr.createWorkloadGroup(createStmt);
+            }
+
+            {
+                String name = "no_tag_g3";
+                Map<String, String> properties = Maps.newHashMap();
+                properties.put(WorkloadGroup.MEMORY_LIMIT, "70%");
+                CreateWorkloadGroupStmt createStmt = new 
CreateWorkloadGroupStmt(false, name, properties);
+                try {
+                    workloadGroupMgr.createWorkloadGroup(createStmt);
+                } catch (DdlException e) {
+                    Assert.assertTrue(e.getMessage().contains("The sum of all 
workload group " + WorkloadGroup.MEMORY_LIMIT));
+                }
+            }
+
+            {
+                String name = "no_tag_g3";
+                Map<String, String> properties = Maps.newHashMap();
+                properties.put(WorkloadGroup.MEMORY_LIMIT, "30%");
+                CreateWorkloadGroupStmt createStmt = new 
CreateWorkloadGroupStmt(false, name, properties);
+                workloadGroupMgr.createWorkloadGroup(createStmt);
+            }
+    }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java 
b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
index aa5ebf83292..76eb0fbb88a 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
@@ -153,6 +153,7 @@ public abstract class TestWithFeService {
     @BeforeAll
     public final void beforeAll() throws Exception {
         FeConstants.enableInternalSchemaDb = false;
+        FeConstants.shouldCreateInternalWorkloadGroup = false;
         beforeCreatingConnectContext();
         connectContext = createDefaultCtx();
         beforeCluster();
diff --git a/gensrc/thrift/BackendService.thrift 
b/gensrc/thrift/BackendService.thrift
index 6a5e4035066..e2cd9f3572d 100644
--- a/gensrc/thrift/BackendService.thrift
+++ b/gensrc/thrift/BackendService.thrift
@@ -243,6 +243,9 @@ struct TPublishTopicResult {
     1: required Status.TStatus status
 }
 
+enum TWorkloadType {
+    INTERNAL = 2
+}
 
 service BackendService {
     // Called by coord to start asynchronous execution of plan fragment in 
backend.
diff --git a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy 
b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
index b272c67c85e..96de9535314 100644
--- a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
+++ b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
@@ -168,6 +168,30 @@ suite("test_crud_wlg") {
         exception "can not be greater than 100%"
     }
 
+    // test alter tag and type
+    test {
+        sql "alter workload group test_group properties ( 'internal_type'='13' 
);"
+
+        exception "internal_type can not be create or modified"
+    }
+
+    test {
+        sql "create workload group inter_wg properties('internal_type'='123');"
+        exception "internal_type can not be create or modified"
+    }
+
+    test {
+        sql "alter workload group normal properties ('tag'='123')"
+
+        exception "_internal and normal group can not set tag"
+    }
+
+    test {
+        sql "alter workload group _internal properties ('tag'='123')"
+
+        exception "_internal and normal group can not set tag"
+    }
+
     sql "alter workload group test_group properties ( 'cpu_hard_limit'='20%' 
);"
     qt_cpu_hard_limit_1 """ select count(1) from ${table_name} """
     qt_cpu_hard_limit_2 "select 
name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num
 from information_schema.workload_groups where name in ('normal','test_group') 
order by name;"
@@ -475,6 +499,11 @@ suite("test_crud_wlg") {
 
 
     // test workload group's tag property, cpu_hard_limit
+    test {
+        sql "create workload group tag_test properties('tag'=' a, b , c ');"
+        exception "tag format is illegal"
+    }
+
     test {
         sql "create workload group if not exists tag1_wg1 properties (  
'cpu_hard_limit'='101%', 'tag'='tag1')"
         exception "must be a positive integer"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to