This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 1fc82cd  [Code Cleanup]Use ThreadPoolManager to manage some native 
thread (#3997)
1fc82cd is described below

commit 1fc82cd6e48ef5fed4d29b7e5cb4df63c3130027
Author: WingC <1018957...@qq.com>
AuthorDate: Sun Jul 5 03:26:22 2020 -0500

    [Code Cleanup]Use ThreadPoolManager to manage some native thread (#3997)
    
    Now, FE use ThreadPoolManager to manage and monitor all Thread,
    but there are still some threads are not managed. And FE use `Timer` class
    to do some scheduler task, but `Timer` class has some problem and is out of 
date,
    It should replace by ScheduledThreadPool.
---
 .../java/org/apache/doris/common/ThreadPoolManager.java  |  8 ++++++++
 .../apache/doris/common/publish/FixedTimePublisher.java  |  3 ++-
 fe/src/main/java/org/apache/doris/metric/MetricRepo.java |  8 +++++---
 fe/src/main/java/org/apache/doris/mysql/MysqlServer.java | 16 ++++++++++------
 .../main/java/org/apache/doris/qe/ConnectScheduler.java  |  9 +++++----
 .../java/org/apache/doris/task/MasterTaskExecutor.java   |  4 ++--
 6 files changed, 32 insertions(+), 16 deletions(-)

diff --git a/fe/src/main/java/org/apache/doris/common/ThreadPoolManager.java 
b/fe/src/main/java/org/apache/doris/common/ThreadPoolManager.java
index c7dafeb..c177cf7 100644
--- a/fe/src/main/java/org/apache/doris/common/ThreadPoolManager.java
+++ b/fe/src/main/java/org/apache/doris/common/ThreadPoolManager.java
@@ -30,6 +30,7 @@ import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -113,6 +114,13 @@ public class ThreadPoolManager {
         return threadPool;
     }
 
+    public static ScheduledThreadPoolExecutor newScheduledThreadPool(int 
maxNumThread, String poolName) {
+        ThreadFactory threadFactory = namedThreadFactory(poolName);
+        ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new 
ScheduledThreadPoolExecutor(maxNumThread, threadFactory);
+        nameToThreadPoolMap.put(poolName, scheduledThreadPoolExecutor);
+        return scheduledThreadPoolExecutor;
+    }
+
     /**
      * Create a thread factory that names threads with a prefix and also sets 
the threads to daemon.
      */
diff --git 
a/fe/src/main/java/org/apache/doris/common/publish/FixedTimePublisher.java 
b/fe/src/main/java/org/apache/doris/common/publish/FixedTimePublisher.java
index d67a09e..658068d 100644
--- a/fe/src/main/java/org/apache/doris/common/publish/FixedTimePublisher.java
+++ b/fe/src/main/java/org/apache/doris/common/publish/FixedTimePublisher.java
@@ -18,6 +18,7 @@
 package org.apache.doris.common.publish;
 
 import org.apache.doris.common.Config;
+import org.apache.doris.common.ThreadPoolManager;
 
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -27,7 +28,7 @@ import java.util.concurrent.TimeUnit;
 public class FixedTimePublisher {
     private static FixedTimePublisher INSTANCE;
 
-    private ScheduledThreadPoolExecutor scheduler = new 
ScheduledThreadPoolExecutor(1);
+    private ScheduledThreadPoolExecutor scheduler = 
ThreadPoolManager.newScheduledThreadPool(1, "Fixed-Time-Publisher");
     private ClusterStatePublisher publisher;
 
     public FixedTimePublisher(ClusterStatePublisher publisher) {
diff --git a/fe/src/main/java/org/apache/doris/metric/MetricRepo.java 
b/fe/src/main/java/org/apache/doris/metric/MetricRepo.java
index 1fa58a2..8d41399 100644
--- a/fe/src/main/java/org/apache/doris/metric/MetricRepo.java
+++ b/fe/src/main/java/org/apache/doris/metric/MetricRepo.java
@@ -22,6 +22,7 @@ import org.apache.doris.alter.AlterJob.JobType;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.TabletInvertedIndex;
 import org.apache.doris.common.Config;
+import org.apache.doris.common.ThreadPoolManager;
 import org.apache.doris.load.EtlJobType;
 import org.apache.doris.load.loadv2.JobState;
 import org.apache.doris.load.loadv2.LoadManager;
@@ -42,7 +43,8 @@ import org.apache.logging.log4j.Logger;
 import java.util.List;
 import java.util.Map;
 import java.util.SortedMap;
-import java.util.Timer;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 public final class MetricRepo {
@@ -84,7 +86,7 @@ public final class MetricRepo {
     public static GaugeMetricImpl<Double> GAUGE_QUERY_ERR_RATE;
     public static GaugeMetricImpl<Long> GAUGE_MAX_TABLET_COMPACTION_SCORE;
 
-    private static Timer metricTimer = new Timer();
+    private static ScheduledThreadPoolExecutor metricTimer = 
ThreadPoolManager.newScheduledThreadPool(1, "Metric-Timer-Pool");
     private static MetricCalculator metricCalculator = new MetricCalculator();
 
     public static synchronized void init() {
@@ -249,7 +251,7 @@ public final class MetricRepo {
         isInit.set(true);
 
         if (Config.enable_metric_calculator) {
-            metricTimer.scheduleAtFixedRate(metricCalculator, 0, 15 * 1000 /* 
15s */);
+            metricTimer.scheduleAtFixedRate(metricCalculator, 0, 15 * 1000L, 
TimeUnit.MILLISECONDS);
         }
     }
 
diff --git a/fe/src/main/java/org/apache/doris/mysql/MysqlServer.java 
b/fe/src/main/java/org/apache/doris/mysql/MysqlServer.java
index 28980d6..1cd099b 100644
--- a/fe/src/main/java/org/apache/doris/mysql/MysqlServer.java
+++ b/fe/src/main/java/org/apache/doris/mysql/MysqlServer.java
@@ -18,6 +18,7 @@
 package org.apache.doris.mysql;
 
 import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.ThreadPoolManager;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.ConnectScheduler;
 import org.apache.logging.log4j.LogManager;
@@ -27,6 +28,8 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
 
 // MySQL protocol network service
 public class MysqlServer {
@@ -37,7 +40,8 @@ public class MysqlServer {
     private ServerSocketChannel serverChannel = null;
     private ConnectScheduler scheduler = null;
     // used to accept connect request from client
-    private Thread listener;
+    private ThreadPoolExecutor listener;
+    private Future listenerFuture;
 
     public MysqlServer(int port, ConnectScheduler scheduler) {
         this.port = port;
@@ -66,9 +70,9 @@ public class MysqlServer {
         }
 
         // start accept thread
-        listener = new Thread(new Listener(), "MySQL Protocol Listener");
+        listener = ThreadPoolManager.newDaemonCacheThreadPool(1, 
"MySQL-Protocol-Listener");
         running = true;
-        listener.start();
+        listenerFuture = listener.submit(new Listener());
 
         return true;
     }
@@ -87,8 +91,8 @@ public class MysqlServer {
 
     public void join() {
         try {
-            listener.join();
-        } catch (InterruptedException e) {
+            listenerFuture.get();
+        } catch (Exception e) {
             // just return
             LOG.warn("Join MySQL server exception.", e);
         }
@@ -98,7 +102,7 @@ public class MysqlServer {
         @Override
         public void run() {
             while (running && serverChannel.isOpen()) {
-                SocketChannel clientChannel = null;
+                SocketChannel clientChannel;
                 try {
                     clientChannel = serverChannel.accept();
                     if (clientChannel == null) {
diff --git a/fe/src/main/java/org/apache/doris/qe/ConnectScheduler.java 
b/fe/src/main/java/org/apache/doris/qe/ConnectScheduler.java
index 35d23f2..6f4ec55 100644
--- a/fe/src/main/java/org/apache/doris/qe/ConnectScheduler.java
+++ b/fe/src/main/java/org/apache/doris/qe/ConnectScheduler.java
@@ -32,9 +32,10 @@ import org.apache.logging.log4j.Logger;
 
 import java.util.List;
 import java.util.Map;
-import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 // 查询请求的调度器
@@ -53,14 +54,14 @@ public class ConnectScheduler {
     // 1. If use a scheduler, the task maybe a huge number when query is messy.
     //    Let timeout is 10m, and 5000 qps, then there are up to 3000000 tasks 
in scheduler.
     // 2. Use a thread to poll maybe lose some accurate, but is enough to us.
-    private Timer checkTimer;
+    private ScheduledExecutorService checkTimer = 
ThreadPoolManager.newScheduledThreadPool(1,
+            "Connect-Scheduler-Check-Timer");
 
     public ConnectScheduler(int maxConnections) {
         this.maxConnections = maxConnections;
         numberConnection = 0;
         nextConnectionId = new AtomicInteger(0);
-        checkTimer = new Timer("ConnectScheduler Check Timer", true);
-        checkTimer.scheduleAtFixedRate(new TimeoutChecker(), 0, 1000);
+        checkTimer.scheduleAtFixedRate(new TimeoutChecker(), 0, 1000L, 
TimeUnit.MILLISECONDS);
     }
 
     private class TimeoutChecker extends TimerTask {
diff --git a/fe/src/main/java/org/apache/doris/task/MasterTaskExecutor.java 
b/fe/src/main/java/org/apache/doris/task/MasterTaskExecutor.java
index 9ad2591..90aad6b 100644
--- a/fe/src/main/java/org/apache/doris/task/MasterTaskExecutor.java
+++ b/fe/src/main/java/org/apache/doris/task/MasterTaskExecutor.java
@@ -19,13 +19,13 @@ package org.apache.doris.task;
 
 import com.google.common.collect.Maps;
 
+import org.apache.doris.common.ThreadPoolManager;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -37,7 +37,7 @@ public class MasterTaskExecutor {
     private Map<Long, Future<?>> runningTasks;
 
     public MasterTaskExecutor(int threadNum) {
-        executor = Executors.newScheduledThreadPool(threadNum);
+        executor = ThreadPoolManager.newScheduledThreadPool(threadNum, 
"Master-Task-Executor-Pool");
         runningTasks = Maps.newHashMap();
         executor.scheduleAtFixedRate(new TaskChecker(), 0L, 1000L, 
TimeUnit.MILLISECONDS);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to