This is an automated email from the ASF dual-hosted git repository. wangbo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 0af0a47324f Fix alter policy failed (#33910) 0af0a47324f is described below commit 0af0a47324f42a24ace24e648e2b58501bd08e67 Author: wangbo <wan...@apache.org> AuthorDate: Mon Apr 22 12:52:23 2024 +0800 Fix alter policy failed (#33910) --- .../schema_workload_sched_policy_scanner.cpp | 1 + .../java/org/apache/doris/catalog/SchemaTable.java | 1 + .../workloadschedpolicy/WorkloadSchedPolicy.java | 8 ++++++-- .../WorkloadSchedPolicyMgr.java | 2 +- .../test_workload_sched_policy.out | 15 ++++++++++++++ .../test_workload_sched_policy.groovy | 24 +++++++++++++++++++++- 6 files changed, 47 insertions(+), 4 deletions(-) diff --git a/be/src/exec/schema_scanner/schema_workload_sched_policy_scanner.cpp b/be/src/exec/schema_scanner/schema_workload_sched_policy_scanner.cpp index 725544ad5a5..3dae2714f02 100644 --- a/be/src/exec/schema_scanner/schema_workload_sched_policy_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_workload_sched_policy_scanner.cpp @@ -34,6 +34,7 @@ std::vector<SchemaScanner::ColumnDesc> SchemaWorkloadSchedulePolicyScanner::_s_t {"PRIORITY", TYPE_INT, sizeof(int32_t), true}, {"ENABLED", TYPE_BOOLEAN, sizeof(bool), true}, {"VERSION", TYPE_INT, sizeof(int32_t), true}, + {"WORKLOAD_GROUP", TYPE_STRING, sizeof(StringRef), true}, }; SchemaWorkloadSchedulePolicyScanner::SchemaWorkloadSchedulePolicyScanner() diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java index a8884c61a55..d0b828fdba2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java @@ -508,6 +508,7 @@ public class SchemaTable extends Table { .column("PRIORITY", ScalarType.createType(PrimitiveType.INT)) .column("ENABLED", ScalarType.createType(PrimitiveType.BOOLEAN)) .column("VERSION", ScalarType.createType(PrimitiveType.INT)) + .column("WORKLOAD_GROUP", ScalarType.createStringType()) .build())) .build(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java index 2f8706c574b..55759e90972 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java @@ -172,7 +172,7 @@ public class WorkloadSchedPolicy implements Writable, GsonPostProcessable { return retType; } - public void updateProperty(Map<String, String> property, List<Long> wgIdList) { + public void updatePropertyIfNotNull(Map<String, String> property, List<Long> wgIdList) { String enabledStr = property.get(ENABLED); if (enabledStr != null) { this.enabled = Boolean.parseBoolean(enabledStr); @@ -183,7 +183,11 @@ public class WorkloadSchedPolicy implements Writable, GsonPostProcessable { this.priority = Integer.parseInt(priorityStr); } - if (wgIdList.size() > 0) { + String workloadGroupIdStr = property.get(WORKLOAD_GROUP); + // workloadGroupIdStr != null means user set workload group property, + // then we should overwrite policy's workloadGroupIdList + // if workloadGroupIdStr.length == 0, it means the policy should match all query. + if (workloadGroupIdStr != null) { this.workloadGroupIdList = wgIdList; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java index 9e6eb33ffbf..ee74d4a506f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java @@ -435,7 +435,7 @@ public class WorkloadSchedPolicyMgr implements Writable, GsonPostProcessable { Map<String, String> properties = alterStmt.getProperties(); List<Long> wgIdList = new ArrayList<>(); checkProperties(properties, wgIdList); - policy.updateProperty(properties, wgIdList); + policy.updatePropertyIfNotNull(properties, wgIdList); policy.incrementVersion(); Env.getCurrentEnv().getEditLog().logAlterWorkloadSchedPolicy(policy); } finally { diff --git a/regression-test/data/workload_manager_p0/test_workload_sched_policy.out b/regression-test/data/workload_manager_p0/test_workload_sched_policy.out index 65b4c1901b6..3152367e9a1 100644 --- a/regression-test/data/workload_manager_p0/test_workload_sched_policy.out +++ b/regression-test/data/workload_manager_p0/test_workload_sched_policy.out @@ -7,3 +7,18 @@ test_cancel_policy query_time > 10 cancel_query 0 false 0 -- !select_policy_tvf_after_drop -- +-- !select_alter_1 -- +test_alter_policy username = test_alter_policy_user set_session_variable "parallel_pipeline_task_num=0" 0 true 0 normal + +-- !select_alter_2 -- +test_alter_policy username = test_alter_policy_user set_session_variable "parallel_pipeline_task_num=0" 0 true 1 + +-- !select_alter_3 -- +test_alter_policy username = test_alter_policy_user set_session_variable "parallel_pipeline_task_num=0" 0 false 2 + +-- !select_alter_4 -- +test_alter_policy username = test_alter_policy_user set_session_variable "parallel_pipeline_task_num=0" 9 false 3 + +-- !select_alter_5 -- +test_alter_policy username = test_alter_policy_user set_session_variable "parallel_pipeline_task_num=0" 9 false 4 normal + diff --git a/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy b/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy index d8ab2611094..776209fa11e 100644 --- a/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy +++ b/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy @@ -149,7 +149,7 @@ suite("test_workload_sched_policy") { } assertEquals("parallel_pipeline_task_num", result3[0][0]) assertEquals("33", result3[0][1]) - + sql "ADMIN SET FRONTEND CONFIG ('workload_sched_policy_interval_ms' = '10000');" sql "drop workload schedule policy if exists test_set_var_policy;" @@ -180,6 +180,28 @@ suite("test_workload_sched_policy") { exception "unknown workload group" } + // test alter policy property + sql "drop user if exists test_alter_policy_user" + sql "CREATE USER 'test_alter_policy_user'@'%' IDENTIFIED BY '12345';" + sql "drop workload schedule policy if exists test_alter_policy;" + sql "create workload schedule policy test_alter_policy conditions(username='test_alter_policy_user') actions(set_session_variable 'parallel_pipeline_task_num=0') properties('workload_group'='normal');" + qt_select_alter_1 "select name,condition,action,PRIORITY,ENABLED,VERSION,WORKLOAD_GROUP from information_schema.workload_schedule_policy where name='test_alter_policy'" + + sql "alter workload schedule policy test_alter_policy properties('workload_group'='');" + qt_select_alter_2 "select name,condition,action,PRIORITY,ENABLED,VERSION,WORKLOAD_GROUP from information_schema.workload_schedule_policy where name='test_alter_policy'" + + sql "alter workload schedule policy test_alter_policy properties('enabled'='false');" + qt_select_alter_3 "select name,condition,action,PRIORITY,ENABLED,VERSION,WORKLOAD_GROUP from information_schema.workload_schedule_policy where name='test_alter_policy'" + + sql "alter workload schedule policy test_alter_policy properties('priority'='9');" + qt_select_alter_4 "select name,condition,action,PRIORITY,ENABLED,VERSION,WORKLOAD_GROUP from information_schema.workload_schedule_policy where name='test_alter_policy'" + + sql "alter workload schedule policy test_alter_policy properties('workload_group'='normal');" + qt_select_alter_5 "select name,condition,action,PRIORITY,ENABLED,VERSION,WORKLOAD_GROUP from information_schema.workload_schedule_policy where name='test_alter_policy'" + + sql "drop user test_alter_policy_user" + sql "drop workload schedule policy test_alter_policy" + // daemon thread alter test def thread1 = new Thread({ def startTime = System.currentTimeMillis() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org