This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 0242f876c18 (pick-2.0-26031)[Fix](MySqlLoad) Fix meaningless thread 
creation every time checkpoint mysql load (#26031) (#26139)
0242f876c18 is described below

commit 0242f876c18ab2e095e0c16a69757b32520269d9
Author: Calvin Kirs <acm_mas...@163.com>
AuthorDate: Wed Nov 1 21:03:01 2023 +0800

    (pick-2.0-26031)[Fix](MySqlLoad) Fix meaningless thread creation every time 
checkpoint mysql load (#26031) (#26139)
---
 .../main/java/org/apache/doris/catalog/Env.java    |  2 +
 .../apache/doris/common/CustomThreadFactory.java   | 46 ++++++++++++++++++++++
 .../org/apache/doris/load/loadv2/LoadManager.java  |  5 +++
 .../apache/doris/load/loadv2/MysqlLoadManager.java | 17 +++++---
 .../org/apache/doris/load/loadv2/TokenManager.java | 18 +++++----
 .../java/org/apache/doris/mtmv/MTMVJobManager.java | 13 ++++--
 .../org/apache/doris/mtmv/MTMVTaskManager.java     |  6 ++-
 .../apache/doris/load/loadv2/TokenManagerTest.java |  2 +
 8 files changed, 91 insertions(+), 18 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index f58fcb9287c..66f80a0a4c3 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -1511,6 +1511,8 @@ public class Env {
 
     // start threads that should running on all FE
     private void startNonMasterDaemonThreads() {
+        // start load manager thread
+        loadManager.start();
         tabletStatMgr.start();
         // load and export job label cleaner thread
         labelCleaner.start();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/CustomThreadFactory.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/CustomThreadFactory.java
new file mode 100644
index 00000000000..153131ec251
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/CustomThreadFactory.java
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class CustomThreadFactory implements ThreadFactory {
+    private final AtomicInteger poolNumber = new AtomicInteger(1);
+    private final ThreadGroup group;
+    private final AtomicInteger threadNumber = new AtomicInteger(1);
+    private final String namePrefix;
+
+    public CustomThreadFactory(String name) {
+        SecurityManager s = System.getSecurityManager();
+        group = (s != null) ? s.getThreadGroup() : 
Thread.currentThread().getThreadGroup();
+        namePrefix = name + "-" + poolNumber.getAndIncrement() + "-thread-";
+    }
+
+    @Override
+    public Thread newThread(Runnable r) {
+        Thread t = new Thread(group, r, namePrefix + 
threadNumber.getAndIncrement(), 0);
+        if (t.isDaemon()) {
+            t.setDaemon(false);
+        }
+        if (t.getPriority() != Thread.NORM_PRIORITY) {
+            t.setPriority(Thread.NORM_PRIORITY);
+        }
+        return t;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
index 73d4d1a57ad..3f05e2c5a98 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
@@ -104,6 +104,11 @@ public class LoadManager implements Writable {
         this.mysqlLoadManager = new MysqlLoadManager(tokenManager);
     }
 
+    public void start() {
+        tokenManager.start();
+        mysqlLoadManager.start();
+    }
+
     /**
      * This method will be invoked by the broker load(v2) now.
      */
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java
index ae1871e0d9c..42583506c56 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java
@@ -26,6 +26,7 @@ import org.apache.doris.analysis.StringLiteral;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.cluster.ClusterNamespace;
 import org.apache.doris.common.Config;
+import org.apache.doris.common.CustomThreadFactory;
 import org.apache.doris.common.LoadException;
 import org.apache.doris.common.ThreadPoolManager;
 import org.apache.doris.common.UserException;
@@ -72,7 +73,7 @@ import java.util.concurrent.TimeUnit;
 public class MysqlLoadManager {
     private static final Logger LOG = 
LogManager.getLogger(MysqlLoadManager.class);
 
-    private final ThreadPoolExecutor mysqlLoadPool;
+    private  ThreadPoolExecutor mysqlLoadPool;
     private final TokenManager tokenManager;
 
     private static class MySqlLoadContext {
@@ -137,14 +138,20 @@ public class MysqlLoadManager {
     }
 
     private final Map<String, MySqlLoadContext> loadContextMap = new 
ConcurrentHashMap<>();
-    private final EvictingQueue<MySqlLoadFailRecord> failedRecords;
-    private ScheduledExecutorService periodScheduler = 
Executors.newScheduledThreadPool(1);
+    private  EvictingQueue<MySqlLoadFailRecord> failedRecords;
+    private ScheduledExecutorService periodScheduler;
 
     public MysqlLoadManager(TokenManager tokenManager) {
+        this.tokenManager = tokenManager;
+    }
+
+    public void start() {
+        this.periodScheduler = Executors.newScheduledThreadPool(1,
+                new CustomThreadFactory("mysql-load-fail-record-cleaner"));
         int poolSize = Config.mysql_load_thread_pool;
         // MySqlLoad pool can accept 4 + 4 * 5 = 24  requests by default.
-        this.mysqlLoadPool = 
ThreadPoolManager.newDaemonFixedThreadPool(poolSize, poolSize * 5, "Mysql 
Load", true);
-        this.tokenManager = tokenManager;
+        this.mysqlLoadPool = 
ThreadPoolManager.newDaemonFixedThreadPool(poolSize, poolSize * 5,
+                "Mysql Load", true);
         this.failedRecords = 
EvictingQueue.create(Config.mysql_load_in_memory_record);
         this.periodScheduler.scheduleAtFixedRate(this::cleanFailedRecords, 1, 
24, TimeUnit.HOURS);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/TokenManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/TokenManager.java
index f4cf4518212..80f6c3f9b50 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/TokenManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/TokenManager.java
@@ -20,6 +20,7 @@ package org.apache.doris.load.loadv2;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.ClientPool;
 import org.apache.doris.common.Config;
+import org.apache.doris.common.CustomThreadFactory;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.UserException;
 import org.apache.doris.thrift.FrontendService;
@@ -41,18 +42,21 @@ import java.util.concurrent.TimeUnit;
 public class TokenManager {
     private static final Logger LOG = LogManager.getLogger(TokenManager.class);
 
-    private final int thriftTimeoutMs = 300 * 1000;
-    private final EvictingQueue<String> tokenQueue;
-    private final ScheduledExecutorService tokenGenerator;
+    private  int thriftTimeoutMs = 300 * 1000;
+    private  EvictingQueue<String> tokenQueue;
+    private  ScheduledExecutorService tokenGenerator;
 
     public TokenManager() {
+    }
+
+    public void start() {
         this.tokenQueue = EvictingQueue.create(Config.token_queue_size);
         // init one token to avoid async issue.
         this.tokenQueue.offer(generateNewToken());
-        this.tokenGenerator = Executors.newScheduledThreadPool(1);
-        this.tokenGenerator.scheduleAtFixedRate(() -> {
-            tokenQueue.offer(generateNewToken());
-        }, 0, Config.token_generate_period_hour, TimeUnit.HOURS);
+        this.tokenGenerator = Executors.newScheduledThreadPool(1,
+                new CustomThreadFactory("token-generator"));
+        this.tokenGenerator.scheduleAtFixedRate(() -> 
tokenQueue.offer(generateNewToken()), 0,
+                Config.token_generate_period_hour, TimeUnit.HOURS);
     }
 
     private String generateNewToken() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
index 7295f40b602..b58f26b863a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
@@ -21,6 +21,7 @@ import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.MaterializedView;
 import org.apache.doris.catalog.TableIf.TableType;
+import org.apache.doris.common.CustomThreadFactory;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.io.Text;
@@ -66,9 +67,11 @@ public class MTMVJobManager {
 
     private final MTMVTaskManager taskManager;
 
-    private ScheduledExecutorService periodScheduler = 
Executors.newScheduledThreadPool(1);
+    private ScheduledExecutorService periodScheduler = 
Executors.newScheduledThreadPool(1,
+            new CustomThreadFactory("mtmv-job-period-scheduler"));
 
-    private ScheduledExecutorService cleanerScheduler = 
Executors.newScheduledThreadPool(1);
+    private ScheduledExecutorService cleanerScheduler = 
Executors.newScheduledThreadPool(1,
+            new CustomThreadFactory("mtmv-job-cleaner-scheduler"));
 
     private final ReentrantReadWriteLock rwLock;
 
@@ -86,13 +89,15 @@ public class MTMVJobManager {
             // check the scheduler before using it
             // since it may be shutdown when master change to follower without 
process shutdown.
             if (periodScheduler.isShutdown()) {
-                periodScheduler = Executors.newScheduledThreadPool(1);
+                periodScheduler = Executors.newScheduledThreadPool(1,
+                        new CustomThreadFactory("mtmv-job-period-scheduler"));
             }
 
             registerJobs();
 
             if (cleanerScheduler.isShutdown()) {
-                cleanerScheduler = Executors.newScheduledThreadPool(1);
+                cleanerScheduler = Executors.newScheduledThreadPool(1,
+                        new CustomThreadFactory("mtmv-job-cleaner-scheduler"));
             }
             cleanerScheduler.scheduleAtFixedRate(() -> {
                 if (!Env.getCurrentEnv().isMaster()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java
index d6e370480bb..138ede9e075 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java
@@ -20,6 +20,7 @@ package org.apache.doris.mtmv;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
+import org.apache.doris.common.CustomThreadFactory;
 import org.apache.doris.mtmv.MTMVUtils.TaskState;
 import org.apache.doris.mtmv.metadata.MTMVJob;
 import org.apache.doris.mtmv.metadata.MTMVTask;
@@ -65,13 +66,14 @@ public class MTMVTaskManager {
     // keep track of all the completed tasks
     private final Deque<MTMVTask> historyTasks = 
Queues.newLinkedBlockingDeque();
 
-    private ScheduledExecutorService taskScheduler = 
Executors.newScheduledThreadPool(1);
+    private ScheduledExecutorService taskScheduler = 
Executors.newScheduledThreadPool(1,
+            new CustomThreadFactory("mtmv-task-scheduler"));
 
     private final AtomicInteger failedTaskCount = new AtomicInteger(0);
 
     public void startTaskScheduler() {
         if (taskScheduler.isShutdown()) {
-            taskScheduler = Executors.newScheduledThreadPool(1);
+            taskScheduler = Executors.newScheduledThreadPool(1, new 
CustomThreadFactory("mtmv-task-scheduler"));
         }
         taskScheduler.scheduleAtFixedRate(() -> {
             checkRunningTask();
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/TokenManagerTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/TokenManagerTest.java
index b592b9df930..13ae9b6e44e 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/TokenManagerTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/TokenManagerTest.java
@@ -34,6 +34,7 @@ public class TokenManagerTest {
     @Test
     public void testTokenCheck() throws UserException {
         TokenManager tokenManager = new TokenManager();
+        tokenManager.start();
         String token = tokenManager.acquireToken();
         Assert.assertTrue(tokenManager.checkAuthToken(token));
     }
@@ -41,6 +42,7 @@ public class TokenManagerTest {
     @Test
     public void testSameToken() throws UserException {
         TokenManager tokenManager = new TokenManager();
+        tokenManager.start();
         String token1 = tokenManager.acquireToken();
         String token2 = tokenManager.acquireToken();
         Assert.assertNotNull(token1);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to