This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 1fc82cd [Code Cleanup]Use ThreadPoolManager to manage some native thread (#3997) 1fc82cd is described below commit 1fc82cd6e48ef5fed4d29b7e5cb4df63c3130027 Author: WingC <1018957...@qq.com> AuthorDate: Sun Jul 5 03:26:22 2020 -0500 [Code Cleanup]Use ThreadPoolManager to manage some native thread (#3997) Now, FE use ThreadPoolManager to manage and monitor all Thread, but there are still some threads are not managed. And FE use `Timer` class to do some scheduler task, but `Timer` class has some problem and is out of date, It should replace by ScheduledThreadPool. --- .../java/org/apache/doris/common/ThreadPoolManager.java | 8 ++++++++ .../apache/doris/common/publish/FixedTimePublisher.java | 3 ++- fe/src/main/java/org/apache/doris/metric/MetricRepo.java | 8 +++++--- fe/src/main/java/org/apache/doris/mysql/MysqlServer.java | 16 ++++++++++------ .../main/java/org/apache/doris/qe/ConnectScheduler.java | 9 +++++---- .../java/org/apache/doris/task/MasterTaskExecutor.java | 4 ++-- 6 files changed, 32 insertions(+), 16 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/common/ThreadPoolManager.java b/fe/src/main/java/org/apache/doris/common/ThreadPoolManager.java index c7dafeb..c177cf7 100644 --- a/fe/src/main/java/org/apache/doris/common/ThreadPoolManager.java +++ b/fe/src/main/java/org/apache/doris/common/ThreadPoolManager.java @@ -30,6 +30,7 @@ import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; @@ -113,6 +114,13 @@ public class ThreadPoolManager { return threadPool; } + public static ScheduledThreadPoolExecutor newScheduledThreadPool(int maxNumThread, String poolName) { + ThreadFactory threadFactory = namedThreadFactory(poolName); + ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(maxNumThread, threadFactory); + nameToThreadPoolMap.put(poolName, scheduledThreadPoolExecutor); + return scheduledThreadPoolExecutor; + } + /** * Create a thread factory that names threads with a prefix and also sets the threads to daemon. */ diff --git a/fe/src/main/java/org/apache/doris/common/publish/FixedTimePublisher.java b/fe/src/main/java/org/apache/doris/common/publish/FixedTimePublisher.java index d67a09e..658068d 100644 --- a/fe/src/main/java/org/apache/doris/common/publish/FixedTimePublisher.java +++ b/fe/src/main/java/org/apache/doris/common/publish/FixedTimePublisher.java @@ -18,6 +18,7 @@ package org.apache.doris.common.publish; import org.apache.doris.common.Config; +import org.apache.doris.common.ThreadPoolManager; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -27,7 +28,7 @@ import java.util.concurrent.TimeUnit; public class FixedTimePublisher { private static FixedTimePublisher INSTANCE; - private ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1); + private ScheduledThreadPoolExecutor scheduler = ThreadPoolManager.newScheduledThreadPool(1, "Fixed-Time-Publisher"); private ClusterStatePublisher publisher; public FixedTimePublisher(ClusterStatePublisher publisher) { diff --git a/fe/src/main/java/org/apache/doris/metric/MetricRepo.java b/fe/src/main/java/org/apache/doris/metric/MetricRepo.java index 1fa58a2..8d41399 100644 --- a/fe/src/main/java/org/apache/doris/metric/MetricRepo.java +++ b/fe/src/main/java/org/apache/doris/metric/MetricRepo.java @@ -22,6 +22,7 @@ import org.apache.doris.alter.AlterJob.JobType; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.common.Config; +import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.load.EtlJobType; import org.apache.doris.load.loadv2.JobState; import org.apache.doris.load.loadv2.LoadManager; @@ -42,7 +43,8 @@ import org.apache.logging.log4j.Logger; import java.util.List; import java.util.Map; import java.util.SortedMap; -import java.util.Timer; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; public final class MetricRepo { @@ -84,7 +86,7 @@ public final class MetricRepo { public static GaugeMetricImpl<Double> GAUGE_QUERY_ERR_RATE; public static GaugeMetricImpl<Long> GAUGE_MAX_TABLET_COMPACTION_SCORE; - private static Timer metricTimer = new Timer(); + private static ScheduledThreadPoolExecutor metricTimer = ThreadPoolManager.newScheduledThreadPool(1, "Metric-Timer-Pool"); private static MetricCalculator metricCalculator = new MetricCalculator(); public static synchronized void init() { @@ -249,7 +251,7 @@ public final class MetricRepo { isInit.set(true); if (Config.enable_metric_calculator) { - metricTimer.scheduleAtFixedRate(metricCalculator, 0, 15 * 1000 /* 15s */); + metricTimer.scheduleAtFixedRate(metricCalculator, 0, 15 * 1000L, TimeUnit.MILLISECONDS); } } diff --git a/fe/src/main/java/org/apache/doris/mysql/MysqlServer.java b/fe/src/main/java/org/apache/doris/mysql/MysqlServer.java index 28980d6..1cd099b 100644 --- a/fe/src/main/java/org/apache/doris/mysql/MysqlServer.java +++ b/fe/src/main/java/org/apache/doris/mysql/MysqlServer.java @@ -18,6 +18,7 @@ package org.apache.doris.mysql; import org.apache.doris.catalog.Catalog; +import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.ConnectScheduler; import org.apache.logging.log4j.LogManager; @@ -27,6 +28,8 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; // MySQL protocol network service public class MysqlServer { @@ -37,7 +40,8 @@ public class MysqlServer { private ServerSocketChannel serverChannel = null; private ConnectScheduler scheduler = null; // used to accept connect request from client - private Thread listener; + private ThreadPoolExecutor listener; + private Future listenerFuture; public MysqlServer(int port, ConnectScheduler scheduler) { this.port = port; @@ -66,9 +70,9 @@ public class MysqlServer { } // start accept thread - listener = new Thread(new Listener(), "MySQL Protocol Listener"); + listener = ThreadPoolManager.newDaemonCacheThreadPool(1, "MySQL-Protocol-Listener"); running = true; - listener.start(); + listenerFuture = listener.submit(new Listener()); return true; } @@ -87,8 +91,8 @@ public class MysqlServer { public void join() { try { - listener.join(); - } catch (InterruptedException e) { + listenerFuture.get(); + } catch (Exception e) { // just return LOG.warn("Join MySQL server exception.", e); } @@ -98,7 +102,7 @@ public class MysqlServer { @Override public void run() { while (running && serverChannel.isOpen()) { - SocketChannel clientChannel = null; + SocketChannel clientChannel; try { clientChannel = serverChannel.accept(); if (clientChannel == null) { diff --git a/fe/src/main/java/org/apache/doris/qe/ConnectScheduler.java b/fe/src/main/java/org/apache/doris/qe/ConnectScheduler.java index 35d23f2..6f4ec55 100644 --- a/fe/src/main/java/org/apache/doris/qe/ConnectScheduler.java +++ b/fe/src/main/java/org/apache/doris/qe/ConnectScheduler.java @@ -32,9 +32,10 @@ import org.apache.logging.log4j.Logger; import java.util.List; import java.util.Map; -import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; // 查询请求的调度器 @@ -53,14 +54,14 @@ public class ConnectScheduler { // 1. If use a scheduler, the task maybe a huge number when query is messy. // Let timeout is 10m, and 5000 qps, then there are up to 3000000 tasks in scheduler. // 2. Use a thread to poll maybe lose some accurate, but is enough to us. - private Timer checkTimer; + private ScheduledExecutorService checkTimer = ThreadPoolManager.newScheduledThreadPool(1, + "Connect-Scheduler-Check-Timer"); public ConnectScheduler(int maxConnections) { this.maxConnections = maxConnections; numberConnection = 0; nextConnectionId = new AtomicInteger(0); - checkTimer = new Timer("ConnectScheduler Check Timer", true); - checkTimer.scheduleAtFixedRate(new TimeoutChecker(), 0, 1000); + checkTimer.scheduleAtFixedRate(new TimeoutChecker(), 0, 1000L, TimeUnit.MILLISECONDS); } private class TimeoutChecker extends TimerTask { diff --git a/fe/src/main/java/org/apache/doris/task/MasterTaskExecutor.java b/fe/src/main/java/org/apache/doris/task/MasterTaskExecutor.java index 9ad2591..90aad6b 100644 --- a/fe/src/main/java/org/apache/doris/task/MasterTaskExecutor.java +++ b/fe/src/main/java/org/apache/doris/task/MasterTaskExecutor.java @@ -19,13 +19,13 @@ package org.apache.doris.task; import com.google.common.collect.Maps; +import org.apache.doris.common.ThreadPoolManager; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -37,7 +37,7 @@ public class MasterTaskExecutor { private Map<Long, Future<?>> runningTasks; public MasterTaskExecutor(int threadNum) { - executor = Executors.newScheduledThreadPool(threadNum); + executor = ThreadPoolManager.newScheduledThreadPool(threadNum, "Master-Task-Executor-Pool"); runningTasks = Maps.newHashMap(); executor.scheduleAtFixedRate(new TaskChecker(), 0L, 1000L, TimeUnit.MILLISECONDS); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org