This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 20e2d2e2f87fbcc8c459605fb578215ed76dfb80
Author: wangbo <wan...@apache.org>
AuthorDate: Sat May 11 23:31:59 2024 +0800

     [Fix](executor)Fix workload thread start failed when follower convert to 
master
---
 .../main/java/org/apache/doris/catalog/Env.java    |  2 +-
 .../resource/workloadgroup/WorkloadGroupMgr.java   | 27 +++----
 .../WorkloadRuntimeStatusMgr.java                  | 64 ++++++----------
 .../WorkloadSchedPolicyMgr.java                    | 88 ++++++++++------------
 .../workloadgroup/WorkloadGroupMgrTest.java        | 33 --------
 .../data/workload_manager_p0/test_curd_wlg.out     |  9 +++
 regression-test/pipeline/p0/conf/fe.conf           |  1 +
 .../workload_manager_p0/test_curd_wlg.groovy       |  6 ++
 8 files changed, 89 insertions(+), 141 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 0aa0da8ef36..4839769e0f8 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -1711,7 +1711,7 @@ public class Env {
 
         dnsCache.start();
 
-        workloadGroupMgr.startUpdateThread();
+        workloadGroupMgr.start();
         workloadSchedPolicyMgr.start();
         workloadRuntimeStatusMgr.start();
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
index a06413a94d4..2cfc59dfd2f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
@@ -34,6 +34,7 @@ import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.proc.BaseProcResult;
 import org.apache.doris.common.proc.ProcResult;
+import org.apache.doris.common.util.MasterDaemon;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.persist.DropWorkloadGroupOperationLog;
 import org.apache.doris.persist.gson.GsonPostProcessable;
@@ -64,7 +65,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-public class WorkloadGroupMgr implements Writable, GsonPostProcessable {
+public class WorkloadGroupMgr extends MasterDaemon implements Writable, 
GsonPostProcessable {
 
     public static final String DEFAULT_GROUP_NAME = "normal";
 
@@ -90,22 +91,13 @@ public class WorkloadGroupMgr implements Writable, 
GsonPostProcessable {
     private final ResourceProcNode procNode = new ResourceProcNode();
     private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
 
-    private Thread updatePropThread;
-
-    public void startUpdateThread() {
-        WorkloadGroupMgr wgMgr = this;
-        updatePropThread = new Thread(() -> {
-            Thread.currentThread().setName("reset-query-queue-prop");
-            while (true) {
-                try {
-                    wgMgr.resetQueryQueueProp();
-                    Thread.sleep(Config.query_queue_update_interval_ms);
-                } catch (Throwable e) {
-                    LOG.warn("reset query queue failed ", e);
-                }
-            }
-        });
-        updatePropThread.start();
+    @Override
+    protected void runAfterCatalogReady() {
+        try {
+            resetQueryQueueProp();
+        } catch (Throwable e) {
+            LOG.warn("reset query queue failed ", e);
+        }
     }
 
     public void resetQueryQueueProp() {
@@ -142,6 +134,7 @@ public class WorkloadGroupMgr implements Writable, 
GsonPostProcessable {
     }
 
     public WorkloadGroupMgr() {
+        super("workload-group-thread", Config.query_queue_update_interval_ms);
         // if no fe image exist, we should append internal group here.
         appendInternalWorkloadGroup();
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
index ff2641d5f3c..de4810b65ac 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
@@ -19,7 +19,7 @@ package org.apache.doris.resource.workloadschedpolicy;
 
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.Config;
-import org.apache.doris.common.util.Daemon;
+import org.apache.doris.common.util.MasterDaemon;
 import org.apache.doris.plugin.audit.AuditEvent;
 import org.apache.doris.thrift.TQueryStatistics;
 import org.apache.doris.thrift.TReportWorkloadRuntimeStatusParams;
@@ -37,7 +37,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-public class WorkloadRuntimeStatusMgr {
+public class WorkloadRuntimeStatusMgr extends MasterDaemon {
 
     private static final Logger LOG = 
LogManager.getLogger(WorkloadRuntimeStatusMgr.class);
     private Map<Long, Map<String, TQueryStatistics>> beToQueryStatsMap = 
Maps.newConcurrentMap();
@@ -46,44 +46,34 @@ public class WorkloadRuntimeStatusMgr {
     private final ReentrantReadWriteLock queryAuditEventLock = new 
ReentrantReadWriteLock();
     private List<AuditEvent> queryAuditEventList = Lists.newLinkedList();
 
-    class WorkloadRuntimeStatsThread extends Daemon {
-
-        WorkloadRuntimeStatusMgr workloadStatsMgr;
-
-        public WorkloadRuntimeStatsThread(WorkloadRuntimeStatusMgr 
workloadRuntimeStatusMgr, String threadName,
-                int interval) {
-            super(threadName, interval);
-            this.workloadStatsMgr = workloadRuntimeStatusMgr;
-        }
+    public WorkloadRuntimeStatusMgr() {
+        super("workload-runtime-stats-thread", 
Config.workload_runtime_status_thread_interval_ms);
+    }
 
-        @Override
-        protected void runOneCycle() {
-            // 1 merge be query statistics
-            Map<String, TQueryStatistics> queryStatisticsMap = 
workloadStatsMgr.getQueryStatisticsMap();
-
-            // 2 log query audit
-            List<AuditEvent> auditEventList = 
workloadStatsMgr.getQueryNeedAudit();
-            for (AuditEvent auditEvent : auditEventList) {
-                TQueryStatistics queryStats = 
queryStatisticsMap.get(auditEvent.queryId);
-                if (queryStats != null) {
-                    auditEvent.scanRows = queryStats.scan_rows;
-                    auditEvent.scanBytes = queryStats.scan_bytes;
-                    auditEvent.peakMemoryBytes = 
queryStats.max_peak_memory_bytes;
-                    auditEvent.cpuTimeMs = queryStats.cpu_ms;
-                    auditEvent.shuffleSendBytes = 
queryStats.shuffle_send_bytes;
-                    auditEvent.shuffleSendRows = queryStats.shuffle_send_rows;
-                }
-                
Env.getCurrentAuditEventProcessor().handleAuditEvent(auditEvent);
+    @Override
+    protected void runAfterCatalogReady() {
+        // 1 merge be query statistics
+        Map<String, TQueryStatistics> queryStatisticsMap = 
getQueryStatisticsMap();
+
+        // 2 log query audit
+        List<AuditEvent> auditEventList = getQueryNeedAudit();
+        for (AuditEvent auditEvent : auditEventList) {
+            TQueryStatistics queryStats = 
queryStatisticsMap.get(auditEvent.queryId);
+            if (queryStats != null) {
+                auditEvent.scanRows = queryStats.scan_rows;
+                auditEvent.scanBytes = queryStats.scan_bytes;
+                auditEvent.peakMemoryBytes = queryStats.max_peak_memory_bytes;
+                auditEvent.cpuTimeMs = queryStats.cpu_ms;
+                auditEvent.shuffleSendBytes = queryStats.shuffle_send_bytes;
+                auditEvent.shuffleSendRows = queryStats.shuffle_send_rows;
             }
-
-            // 3 clear beToQueryStatsMap when be report timeout
-            workloadStatsMgr.clearReportTimeoutBeStatistics();
+            Env.getCurrentAuditEventProcessor().handleAuditEvent(auditEvent);
         }
 
+        // 3 clear beToQueryStatsMap when be report timeout
+        clearReportTimeoutBeStatistics();
     }
 
-    private Daemon thread = null;
-
     public void submitFinishQueryToAudit(AuditEvent event) {
         queryAuditEventLogWriteLock();
         try {
@@ -116,12 +106,6 @@ public class WorkloadRuntimeStatusMgr {
         return ret;
     }
 
-    public void start() {
-        thread = new WorkloadRuntimeStatsThread(this, 
"workload-runtime-stats-thread",
-                Config.workload_runtime_status_thread_interval_ms);
-        thread.start();
-    }
-
     public void updateBeQueryStats(TReportWorkloadRuntimeStatusParams params) {
         if (!params.isSetBackendId()) {
             LOG.warn("be report workload runtime status but without beid");
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java
index ee74d4a506f..4aa7563f8d7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java
@@ -29,6 +29,7 @@ import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.proc.BaseProcResult;
 import org.apache.doris.common.proc.ProcResult;
 import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.common.util.MasterDaemon;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.persist.gson.GsonPostProcessable;
 import org.apache.doris.persist.gson.GsonUtils;
@@ -59,7 +60,7 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-public class WorkloadSchedPolicyMgr implements Writable, GsonPostProcessable {
+public class WorkloadSchedPolicyMgr extends MasterDaemon implements Writable, 
GsonPostProcessable {
 
     private static final Logger LOG = 
LogManager.getLogger(WorkloadSchedPolicyMgr.class);
 
@@ -69,6 +70,10 @@ public class WorkloadSchedPolicyMgr implements Writable, 
GsonPostProcessable {
 
     private PolicyProcNode policyProcNode = new PolicyProcNode();
 
+    public WorkloadSchedPolicyMgr() {
+        super("workload-sched-thread", 
Config.workload_sched_policy_interval_ms);
+    }
+
     public static final ImmutableList<String> 
WORKLOAD_SCHED_POLICY_NODE_TITLE_NAMES
             = new ImmutableList.Builder<String>()
             
.add("Id").add("Name").add("Condition").add("Action").add("Priority").add("Enabled").add("Version")
@@ -99,60 +104,43 @@ public class WorkloadSchedPolicyMgr implements Writable, 
GsonPostProcessable {
         }
     };
 
-    private Thread policyExecThread = new Thread() {
-
-        @Override
-        public void run() {
-            while (true) {
-                try {
-                    // todo(wb) add more query info source, not only comes 
from connectionmap
-                    // 1 get query info map
-                    Map<Integer, ConnectContext> connectMap = 
ExecuteEnv.getInstance().getScheduler()
-                            .getConnectionMap();
-                    List<WorkloadQueryInfo> queryInfoList = new ArrayList<>();
-
-                    // a snapshot for connect context
-                    Set<Integer> keySet = new HashSet<>();
-                    keySet.addAll(connectMap.keySet());
-
-                    for (Integer connectId : keySet) {
-                        ConnectContext cctx = connectMap.get(connectId);
-                        if (cctx == null || cctx.isKilled()) {
-                            continue;
-                        }
-
-                        String username = cctx.getQualifiedUser();
-                        WorkloadQueryInfo policyQueryInfo = new 
WorkloadQueryInfo();
-                        policyQueryInfo.queryId = cctx.queryId() == null ? 
null : DebugUtil.printId(cctx.queryId());
-                        policyQueryInfo.tUniqueId = cctx.queryId();
-                        policyQueryInfo.context = cctx;
-                        policyQueryInfo.metricMap = new HashMap<>();
-                        
policyQueryInfo.metricMap.put(WorkloadMetricType.USERNAME, username);
+    @Override
+    protected void runAfterCatalogReady() {
+        try {
+            // todo(wb) add more query info source, not only comes from 
connectionmap
+            // 1 get query info map
+            Map<Integer, ConnectContext> connectMap = 
ExecuteEnv.getInstance().getScheduler()
+                    .getConnectionMap();
+            List<WorkloadQueryInfo> queryInfoList = new ArrayList<>();
+
+            // a snapshot for connect context
+            Set<Integer> keySet = new HashSet<>();
+            keySet.addAll(connectMap.keySet());
+
+            for (Integer connectId : keySet) {
+                ConnectContext cctx = connectMap.get(connectId);
+                if (cctx == null || cctx.isKilled()) {
+                    continue;
+                }
 
-                        queryInfoList.add(policyQueryInfo);
-                    }
+                String username = cctx.getQualifiedUser();
+                WorkloadQueryInfo policyQueryInfo = new WorkloadQueryInfo();
+                policyQueryInfo.queryId = cctx.queryId() == null ? null : 
DebugUtil.printId(cctx.queryId());
+                policyQueryInfo.tUniqueId = cctx.queryId();
+                policyQueryInfo.context = cctx;
+                policyQueryInfo.metricMap = new HashMap<>();
+                policyQueryInfo.metricMap.put(WorkloadMetricType.USERNAME, 
username);
 
-                    // 2 exec policy
-                    if (queryInfoList.size() > 0) {
-                        execPolicy(queryInfoList);
-                    }
-                } catch (Throwable t) {
-                    LOG.error("[policy thread]error happens when exec policy");
-                }
+                queryInfoList.add(policyQueryInfo);
+            }
 
-                // 3 sleep
-                try {
-                    Thread.sleep(Config.workload_sched_policy_interval_ms);
-                } catch (InterruptedException e) {
-                    LOG.error("error happends when policy exec thread sleep");
-                }
+            // 2 exec policy
+            if (queryInfoList.size() > 0) {
+                execPolicy(queryInfoList);
             }
+        } catch (Throwable t) {
+            LOG.error("[policy thread]error happens when exec policy");
         }
-    };
-
-    public void start() {
-        policyExecThread.setName("workload-auto-scheduler-thread");
-        policyExecThread.start();
     }
 
     public void createWorkloadSchedPolicy(CreateWorkloadSchedPolicyStmt 
createStmt) throws UserException {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java
index 1a9fac1ea7a..1e73dc79510 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java
@@ -19,7 +19,6 @@ package org.apache.doris.resource.workloadgroup;
 
 import org.apache.doris.analysis.AlterWorkloadGroupStmt;
 import org.apache.doris.analysis.CreateWorkloadGroupStmt;
-import org.apache.doris.analysis.DropWorkloadGroupStmt;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
@@ -190,38 +189,6 @@ public class WorkloadGroupMgrTest {
         }
     }
 
-    @Test
-    public void testDropWorkloadGroup() throws UserException {
-        Config.enable_workload_group = true;
-        ConnectContext context = new ConnectContext();
-        WorkloadGroupMgr workloadGroupMgr = new WorkloadGroupMgr();
-        Map<String, String> properties = Maps.newHashMap();
-        properties.put(WorkloadGroup.CPU_SHARE, "10");
-        properties.put(WorkloadGroup.MEMORY_LIMIT, "30%");
-        String name = "g1";
-        CreateWorkloadGroupStmt createStmt = new 
CreateWorkloadGroupStmt(false, name, properties);
-        workloadGroupMgr.createWorkloadGroup(createStmt);
-        context.getSessionVariable().setWorkloadGroup(name);
-        Assert.assertEquals(1, 
workloadGroupMgr.getWorkloadGroup(context).size());
-
-        DropWorkloadGroupStmt dropStmt = new DropWorkloadGroupStmt(false, 
name);
-        workloadGroupMgr.dropWorkloadGroup(dropStmt);
-        try {
-            context.getSessionVariable().setWorkloadGroup(name);
-            workloadGroupMgr.getWorkloadGroup(context);
-            Assert.fail();
-        } catch (UserException e) {
-            Assert.assertTrue(e.getMessage().contains("does not exist"));
-        }
-
-        DropWorkloadGroupStmt dropDefaultStmt = new 
DropWorkloadGroupStmt(false, WorkloadGroupMgr.DEFAULT_GROUP_NAME);
-        try {
-            workloadGroupMgr.dropWorkloadGroup(dropDefaultStmt);
-        } catch (DdlException e) {
-            Assert.assertTrue(e.getMessage().contains("is not allowed"));
-        }
-    }
-
     @Test
     public void testAlterWorkloadGroup() throws UserException {
         Config.enable_workload_group = true;
diff --git a/regression-test/data/workload_manager_p0/test_curd_wlg.out 
b/regression-test/data/workload_manager_p0/test_curd_wlg.out
index 876be32601a..d675e0d7b79 100644
--- a/regression-test/data/workload_manager_p0/test_curd_wlg.out
+++ b/regression-test/data/workload_manager_p0/test_curd_wlg.out
@@ -9,6 +9,15 @@
 normal 20      50%     true    2147483647      0       0       1%      16      
 test_group     10      10%     true    2147483647      0       0       -1      
-1      
 
+-- !show_del_wg_1 --
+normal 20      50%     true    2147483647      0       0       1%      16      
+test_drop_wg   10      0%      true    2147483647      0       0       -1      
-1      
+test_group     10      10%     true    2147483647      0       0       -1      
-1      
+
+-- !show_del_wg_2 --
+normal 20      50%     true    2147483647      0       0       1%      16      
+test_group     10      10%     true    2147483647      0       0       -1      
-1      
+
 -- !mem_limit_1 --
 2
 
diff --git a/regression-test/pipeline/p0/conf/fe.conf 
b/regression-test/pipeline/p0/conf/fe.conf
index 28f1972c701..3233d67dd56 100644
--- a/regression-test/pipeline/p0/conf/fe.conf
+++ b/regression-test/pipeline/p0/conf/fe.conf
@@ -110,6 +110,7 @@ enable_job_schedule_second_for_test = true
 
 enable_workload_group = true
 publish_topic_info_interval_ms = 1000
+workload_sched_policy_interval_ms = 1000
 
 master_sync_policy = WRITE_NO_SYNC
 replica_sync_policy = WRITE_NO_SYNC
diff --git a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy 
b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
index 05034529726..62eb762ff9b 100644
--- a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
+++ b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
@@ -128,6 +128,12 @@ suite("test_crud_wlg") {
 
     qt_show_1 "select 
name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num,tag
 from information_schema.workload_groups where name in ('normal','test_group') 
order by name;"
 
+    // test drop workload group
+    sql "create workload group if not exists test_drop_wg properties 
('cpu_share'='10')"
+    qt_show_del_wg_1 "select 
name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num,tag
 from information_schema.workload_groups where name in 
('normal','test_group','test_drop_wg') order by name;"
+    sql "drop workload group test_drop_wg"
+    qt_show_del_wg_2 "select 
name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num,tag
 from information_schema.workload_groups where name in 
('normal','test_group','test_drop_wg') order by name;"
+
     // test memory_limit
     test {
         sql "alter workload group test_group properties ( 
'memory_limit'='100%' );"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to