This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 20e2d2e2f87fbcc8c459605fb578215ed76dfb80 Author: wangbo <wan...@apache.org> AuthorDate: Sat May 11 23:31:59 2024 +0800 [Fix](executor)Fix workload thread start failed when follower convert to master --- .../main/java/org/apache/doris/catalog/Env.java | 2 +- .../resource/workloadgroup/WorkloadGroupMgr.java | 27 +++---- .../WorkloadRuntimeStatusMgr.java | 64 ++++++---------- .../WorkloadSchedPolicyMgr.java | 88 ++++++++++------------ .../workloadgroup/WorkloadGroupMgrTest.java | 33 -------- .../data/workload_manager_p0/test_curd_wlg.out | 9 +++ regression-test/pipeline/p0/conf/fe.conf | 1 + .../workload_manager_p0/test_curd_wlg.groovy | 6 ++ 8 files changed, 89 insertions(+), 141 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 0aa0da8ef36..4839769e0f8 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -1711,7 +1711,7 @@ public class Env { dnsCache.start(); - workloadGroupMgr.startUpdateThread(); + workloadGroupMgr.start(); workloadSchedPolicyMgr.start(); workloadRuntimeStatusMgr.start(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java index a06413a94d4..2cfc59dfd2f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java @@ -34,6 +34,7 @@ import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.common.proc.BaseProcResult; import org.apache.doris.common.proc.ProcResult; +import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.persist.DropWorkloadGroupOperationLog; import org.apache.doris.persist.gson.GsonPostProcessable; @@ -64,7 +65,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.locks.ReentrantReadWriteLock; -public class WorkloadGroupMgr implements Writable, GsonPostProcessable { +public class WorkloadGroupMgr extends MasterDaemon implements Writable, GsonPostProcessable { public static final String DEFAULT_GROUP_NAME = "normal"; @@ -90,22 +91,13 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable { private final ResourceProcNode procNode = new ResourceProcNode(); private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - private Thread updatePropThread; - - public void startUpdateThread() { - WorkloadGroupMgr wgMgr = this; - updatePropThread = new Thread(() -> { - Thread.currentThread().setName("reset-query-queue-prop"); - while (true) { - try { - wgMgr.resetQueryQueueProp(); - Thread.sleep(Config.query_queue_update_interval_ms); - } catch (Throwable e) { - LOG.warn("reset query queue failed ", e); - } - } - }); - updatePropThread.start(); + @Override + protected void runAfterCatalogReady() { + try { + resetQueryQueueProp(); + } catch (Throwable e) { + LOG.warn("reset query queue failed ", e); + } } public void resetQueryQueueProp() { @@ -142,6 +134,7 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable { } public WorkloadGroupMgr() { + super("workload-group-thread", Config.query_queue_update_interval_ms); // if no fe image exist, we should append internal group here. appendInternalWorkloadGroup(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java index ff2641d5f3c..de4810b65ac 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java @@ -19,7 +19,7 @@ package org.apache.doris.resource.workloadschedpolicy; import org.apache.doris.catalog.Env; import org.apache.doris.common.Config; -import org.apache.doris.common.util.Daemon; +import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.plugin.audit.AuditEvent; import org.apache.doris.thrift.TQueryStatistics; import org.apache.doris.thrift.TReportWorkloadRuntimeStatusParams; @@ -37,7 +37,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.locks.ReentrantReadWriteLock; -public class WorkloadRuntimeStatusMgr { +public class WorkloadRuntimeStatusMgr extends MasterDaemon { private static final Logger LOG = LogManager.getLogger(WorkloadRuntimeStatusMgr.class); private Map<Long, Map<String, TQueryStatistics>> beToQueryStatsMap = Maps.newConcurrentMap(); @@ -46,44 +46,34 @@ public class WorkloadRuntimeStatusMgr { private final ReentrantReadWriteLock queryAuditEventLock = new ReentrantReadWriteLock(); private List<AuditEvent> queryAuditEventList = Lists.newLinkedList(); - class WorkloadRuntimeStatsThread extends Daemon { - - WorkloadRuntimeStatusMgr workloadStatsMgr; - - public WorkloadRuntimeStatsThread(WorkloadRuntimeStatusMgr workloadRuntimeStatusMgr, String threadName, - int interval) { - super(threadName, interval); - this.workloadStatsMgr = workloadRuntimeStatusMgr; - } + public WorkloadRuntimeStatusMgr() { + super("workload-runtime-stats-thread", Config.workload_runtime_status_thread_interval_ms); + } - @Override - protected void runOneCycle() { - // 1 merge be query statistics - Map<String, TQueryStatistics> queryStatisticsMap = workloadStatsMgr.getQueryStatisticsMap(); - - // 2 log query audit - List<AuditEvent> auditEventList = workloadStatsMgr.getQueryNeedAudit(); - for (AuditEvent auditEvent : auditEventList) { - TQueryStatistics queryStats = queryStatisticsMap.get(auditEvent.queryId); - if (queryStats != null) { - auditEvent.scanRows = queryStats.scan_rows; - auditEvent.scanBytes = queryStats.scan_bytes; - auditEvent.peakMemoryBytes = queryStats.max_peak_memory_bytes; - auditEvent.cpuTimeMs = queryStats.cpu_ms; - auditEvent.shuffleSendBytes = queryStats.shuffle_send_bytes; - auditEvent.shuffleSendRows = queryStats.shuffle_send_rows; - } - Env.getCurrentAuditEventProcessor().handleAuditEvent(auditEvent); + @Override + protected void runAfterCatalogReady() { + // 1 merge be query statistics + Map<String, TQueryStatistics> queryStatisticsMap = getQueryStatisticsMap(); + + // 2 log query audit + List<AuditEvent> auditEventList = getQueryNeedAudit(); + for (AuditEvent auditEvent : auditEventList) { + TQueryStatistics queryStats = queryStatisticsMap.get(auditEvent.queryId); + if (queryStats != null) { + auditEvent.scanRows = queryStats.scan_rows; + auditEvent.scanBytes = queryStats.scan_bytes; + auditEvent.peakMemoryBytes = queryStats.max_peak_memory_bytes; + auditEvent.cpuTimeMs = queryStats.cpu_ms; + auditEvent.shuffleSendBytes = queryStats.shuffle_send_bytes; + auditEvent.shuffleSendRows = queryStats.shuffle_send_rows; } - - // 3 clear beToQueryStatsMap when be report timeout - workloadStatsMgr.clearReportTimeoutBeStatistics(); + Env.getCurrentAuditEventProcessor().handleAuditEvent(auditEvent); } + // 3 clear beToQueryStatsMap when be report timeout + clearReportTimeoutBeStatistics(); } - private Daemon thread = null; - public void submitFinishQueryToAudit(AuditEvent event) { queryAuditEventLogWriteLock(); try { @@ -116,12 +106,6 @@ public class WorkloadRuntimeStatusMgr { return ret; } - public void start() { - thread = new WorkloadRuntimeStatsThread(this, "workload-runtime-stats-thread", - Config.workload_runtime_status_thread_interval_ms); - thread.start(); - } - public void updateBeQueryStats(TReportWorkloadRuntimeStatusParams params) { if (!params.isSetBackendId()) { LOG.warn("be report workload runtime status but without beid"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java index ee74d4a506f..4aa7563f8d7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java @@ -29,6 +29,7 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.common.proc.BaseProcResult; import org.apache.doris.common.proc.ProcResult; import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.persist.gson.GsonPostProcessable; import org.apache.doris.persist.gson.GsonUtils; @@ -59,7 +60,7 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.locks.ReentrantReadWriteLock; -public class WorkloadSchedPolicyMgr implements Writable, GsonPostProcessable { +public class WorkloadSchedPolicyMgr extends MasterDaemon implements Writable, GsonPostProcessable { private static final Logger LOG = LogManager.getLogger(WorkloadSchedPolicyMgr.class); @@ -69,6 +70,10 @@ public class WorkloadSchedPolicyMgr implements Writable, GsonPostProcessable { private PolicyProcNode policyProcNode = new PolicyProcNode(); + public WorkloadSchedPolicyMgr() { + super("workload-sched-thread", Config.workload_sched_policy_interval_ms); + } + public static final ImmutableList<String> WORKLOAD_SCHED_POLICY_NODE_TITLE_NAMES = new ImmutableList.Builder<String>() .add("Id").add("Name").add("Condition").add("Action").add("Priority").add("Enabled").add("Version") @@ -99,60 +104,43 @@ public class WorkloadSchedPolicyMgr implements Writable, GsonPostProcessable { } }; - private Thread policyExecThread = new Thread() { - - @Override - public void run() { - while (true) { - try { - // todo(wb) add more query info source, not only comes from connectionmap - // 1 get query info map - Map<Integer, ConnectContext> connectMap = ExecuteEnv.getInstance().getScheduler() - .getConnectionMap(); - List<WorkloadQueryInfo> queryInfoList = new ArrayList<>(); - - // a snapshot for connect context - Set<Integer> keySet = new HashSet<>(); - keySet.addAll(connectMap.keySet()); - - for (Integer connectId : keySet) { - ConnectContext cctx = connectMap.get(connectId); - if (cctx == null || cctx.isKilled()) { - continue; - } - - String username = cctx.getQualifiedUser(); - WorkloadQueryInfo policyQueryInfo = new WorkloadQueryInfo(); - policyQueryInfo.queryId = cctx.queryId() == null ? null : DebugUtil.printId(cctx.queryId()); - policyQueryInfo.tUniqueId = cctx.queryId(); - policyQueryInfo.context = cctx; - policyQueryInfo.metricMap = new HashMap<>(); - policyQueryInfo.metricMap.put(WorkloadMetricType.USERNAME, username); + @Override + protected void runAfterCatalogReady() { + try { + // todo(wb) add more query info source, not only comes from connectionmap + // 1 get query info map + Map<Integer, ConnectContext> connectMap = ExecuteEnv.getInstance().getScheduler() + .getConnectionMap(); + List<WorkloadQueryInfo> queryInfoList = new ArrayList<>(); + + // a snapshot for connect context + Set<Integer> keySet = new HashSet<>(); + keySet.addAll(connectMap.keySet()); + + for (Integer connectId : keySet) { + ConnectContext cctx = connectMap.get(connectId); + if (cctx == null || cctx.isKilled()) { + continue; + } - queryInfoList.add(policyQueryInfo); - } + String username = cctx.getQualifiedUser(); + WorkloadQueryInfo policyQueryInfo = new WorkloadQueryInfo(); + policyQueryInfo.queryId = cctx.queryId() == null ? null : DebugUtil.printId(cctx.queryId()); + policyQueryInfo.tUniqueId = cctx.queryId(); + policyQueryInfo.context = cctx; + policyQueryInfo.metricMap = new HashMap<>(); + policyQueryInfo.metricMap.put(WorkloadMetricType.USERNAME, username); - // 2 exec policy - if (queryInfoList.size() > 0) { - execPolicy(queryInfoList); - } - } catch (Throwable t) { - LOG.error("[policy thread]error happens when exec policy"); - } + queryInfoList.add(policyQueryInfo); + } - // 3 sleep - try { - Thread.sleep(Config.workload_sched_policy_interval_ms); - } catch (InterruptedException e) { - LOG.error("error happends when policy exec thread sleep"); - } + // 2 exec policy + if (queryInfoList.size() > 0) { + execPolicy(queryInfoList); } + } catch (Throwable t) { + LOG.error("[policy thread]error happens when exec policy"); } - }; - - public void start() { - policyExecThread.setName("workload-auto-scheduler-thread"); - policyExecThread.start(); } public void createWorkloadSchedPolicy(CreateWorkloadSchedPolicyStmt createStmt) throws UserException { diff --git a/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java index 1a9fac1ea7a..1e73dc79510 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java @@ -19,7 +19,6 @@ package org.apache.doris.resource.workloadgroup; import org.apache.doris.analysis.AlterWorkloadGroupStmt; import org.apache.doris.analysis.CreateWorkloadGroupStmt; -import org.apache.doris.analysis.DropWorkloadGroupStmt; import org.apache.doris.catalog.Env; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; @@ -190,38 +189,6 @@ public class WorkloadGroupMgrTest { } } - @Test - public void testDropWorkloadGroup() throws UserException { - Config.enable_workload_group = true; - ConnectContext context = new ConnectContext(); - WorkloadGroupMgr workloadGroupMgr = new WorkloadGroupMgr(); - Map<String, String> properties = Maps.newHashMap(); - properties.put(WorkloadGroup.CPU_SHARE, "10"); - properties.put(WorkloadGroup.MEMORY_LIMIT, "30%"); - String name = "g1"; - CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); - workloadGroupMgr.createWorkloadGroup(createStmt); - context.getSessionVariable().setWorkloadGroup(name); - Assert.assertEquals(1, workloadGroupMgr.getWorkloadGroup(context).size()); - - DropWorkloadGroupStmt dropStmt = new DropWorkloadGroupStmt(false, name); - workloadGroupMgr.dropWorkloadGroup(dropStmt); - try { - context.getSessionVariable().setWorkloadGroup(name); - workloadGroupMgr.getWorkloadGroup(context); - Assert.fail(); - } catch (UserException e) { - Assert.assertTrue(e.getMessage().contains("does not exist")); - } - - DropWorkloadGroupStmt dropDefaultStmt = new DropWorkloadGroupStmt(false, WorkloadGroupMgr.DEFAULT_GROUP_NAME); - try { - workloadGroupMgr.dropWorkloadGroup(dropDefaultStmt); - } catch (DdlException e) { - Assert.assertTrue(e.getMessage().contains("is not allowed")); - } - } - @Test public void testAlterWorkloadGroup() throws UserException { Config.enable_workload_group = true; diff --git a/regression-test/data/workload_manager_p0/test_curd_wlg.out b/regression-test/data/workload_manager_p0/test_curd_wlg.out index 876be32601a..d675e0d7b79 100644 --- a/regression-test/data/workload_manager_p0/test_curd_wlg.out +++ b/regression-test/data/workload_manager_p0/test_curd_wlg.out @@ -9,6 +9,15 @@ normal 20 50% true 2147483647 0 0 1% 16 test_group 10 10% true 2147483647 0 0 -1 -1 +-- !show_del_wg_1 -- +normal 20 50% true 2147483647 0 0 1% 16 +test_drop_wg 10 0% true 2147483647 0 0 -1 -1 +test_group 10 10% true 2147483647 0 0 -1 -1 + +-- !show_del_wg_2 -- +normal 20 50% true 2147483647 0 0 1% 16 +test_group 10 10% true 2147483647 0 0 -1 -1 + -- !mem_limit_1 -- 2 diff --git a/regression-test/pipeline/p0/conf/fe.conf b/regression-test/pipeline/p0/conf/fe.conf index 28f1972c701..3233d67dd56 100644 --- a/regression-test/pipeline/p0/conf/fe.conf +++ b/regression-test/pipeline/p0/conf/fe.conf @@ -110,6 +110,7 @@ enable_job_schedule_second_for_test = true enable_workload_group = true publish_topic_info_interval_ms = 1000 +workload_sched_policy_interval_ms = 1000 master_sync_policy = WRITE_NO_SYNC replica_sync_policy = WRITE_NO_SYNC diff --git a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy index 05034529726..62eb762ff9b 100644 --- a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy +++ b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy @@ -128,6 +128,12 @@ suite("test_crud_wlg") { qt_show_1 "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num,tag from information_schema.workload_groups where name in ('normal','test_group') order by name;" + // test drop workload group + sql "create workload group if not exists test_drop_wg properties ('cpu_share'='10')" + qt_show_del_wg_1 "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num,tag from information_schema.workload_groups where name in ('normal','test_group','test_drop_wg') order by name;" + sql "drop workload group test_drop_wg" + qt_show_del_wg_2 "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num,tag from information_schema.workload_groups where name in ('normal','test_group','test_drop_wg') order by name;" + // test memory_limit test { sql "alter workload group test_group properties ( 'memory_limit'='100%' );" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org