This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 0242f876c18 (pick-2.0-26031)[Fix](MySqlLoad) Fix meaningless thread creation every time checkpoint mysql load (#26031) (#26139) 0242f876c18 is described below commit 0242f876c18ab2e095e0c16a69757b32520269d9 Author: Calvin Kirs <acm_mas...@163.com> AuthorDate: Wed Nov 1 21:03:01 2023 +0800 (pick-2.0-26031)[Fix](MySqlLoad) Fix meaningless thread creation every time checkpoint mysql load (#26031) (#26139) --- .../main/java/org/apache/doris/catalog/Env.java | 2 + .../apache/doris/common/CustomThreadFactory.java | 46 ++++++++++++++++++++++ .../org/apache/doris/load/loadv2/LoadManager.java | 5 +++ .../apache/doris/load/loadv2/MysqlLoadManager.java | 17 +++++--- .../org/apache/doris/load/loadv2/TokenManager.java | 18 +++++---- .../java/org/apache/doris/mtmv/MTMVJobManager.java | 13 ++++-- .../org/apache/doris/mtmv/MTMVTaskManager.java | 6 ++- .../apache/doris/load/loadv2/TokenManagerTest.java | 2 + 8 files changed, 91 insertions(+), 18 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index f58fcb9287c..66f80a0a4c3 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -1511,6 +1511,8 @@ public class Env { // start threads that should running on all FE private void startNonMasterDaemonThreads() { + // start load manager thread + loadManager.start(); tabletStatMgr.start(); // load and export job label cleaner thread labelCleaner.start(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/CustomThreadFactory.java b/fe/fe-core/src/main/java/org/apache/doris/common/CustomThreadFactory.java new file mode 100644 index 00000000000..153131ec251 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/CustomThreadFactory.java @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +public class CustomThreadFactory implements ThreadFactory { + private final AtomicInteger poolNumber = new AtomicInteger(1); + private final ThreadGroup group; + private final AtomicInteger threadNumber = new AtomicInteger(1); + private final String namePrefix; + + public CustomThreadFactory(String name) { + SecurityManager s = System.getSecurityManager(); + group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); + namePrefix = name + "-" + poolNumber.getAndIncrement() + "-thread-"; + } + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); + if (t.isDaemon()) { + t.setDaemon(false); + } + if (t.getPriority() != Thread.NORM_PRIORITY) { + t.setPriority(Thread.NORM_PRIORITY); + } + return t; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index 73d4d1a57ad..3f05e2c5a98 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -104,6 +104,11 @@ public class LoadManager implements Writable { this.mysqlLoadManager = new MysqlLoadManager(tokenManager); } + public void start() { + tokenManager.start(); + mysqlLoadManager.start(); + } + /** * This method will be invoked by the broker load(v2) now. */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java index ae1871e0d9c..42583506c56 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java @@ -26,6 +26,7 @@ import org.apache.doris.analysis.StringLiteral; import org.apache.doris.catalog.Env; import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.Config; +import org.apache.doris.common.CustomThreadFactory; import org.apache.doris.common.LoadException; import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.UserException; @@ -72,7 +73,7 @@ import java.util.concurrent.TimeUnit; public class MysqlLoadManager { private static final Logger LOG = LogManager.getLogger(MysqlLoadManager.class); - private final ThreadPoolExecutor mysqlLoadPool; + private ThreadPoolExecutor mysqlLoadPool; private final TokenManager tokenManager; private static class MySqlLoadContext { @@ -137,14 +138,20 @@ public class MysqlLoadManager { } private final Map<String, MySqlLoadContext> loadContextMap = new ConcurrentHashMap<>(); - private final EvictingQueue<MySqlLoadFailRecord> failedRecords; - private ScheduledExecutorService periodScheduler = Executors.newScheduledThreadPool(1); + private EvictingQueue<MySqlLoadFailRecord> failedRecords; + private ScheduledExecutorService periodScheduler; public MysqlLoadManager(TokenManager tokenManager) { + this.tokenManager = tokenManager; + } + + public void start() { + this.periodScheduler = Executors.newScheduledThreadPool(1, + new CustomThreadFactory("mysql-load-fail-record-cleaner")); int poolSize = Config.mysql_load_thread_pool; // MySqlLoad pool can accept 4 + 4 * 5 = 24 requests by default. - this.mysqlLoadPool = ThreadPoolManager.newDaemonFixedThreadPool(poolSize, poolSize * 5, "Mysql Load", true); - this.tokenManager = tokenManager; + this.mysqlLoadPool = ThreadPoolManager.newDaemonFixedThreadPool(poolSize, poolSize * 5, + "Mysql Load", true); this.failedRecords = EvictingQueue.create(Config.mysql_load_in_memory_record); this.periodScheduler.scheduleAtFixedRate(this::cleanFailedRecords, 1, 24, TimeUnit.HOURS); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/TokenManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/TokenManager.java index f4cf4518212..80f6c3f9b50 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/TokenManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/TokenManager.java @@ -20,6 +20,7 @@ package org.apache.doris.load.loadv2; import org.apache.doris.catalog.Env; import org.apache.doris.common.ClientPool; import org.apache.doris.common.Config; +import org.apache.doris.common.CustomThreadFactory; import org.apache.doris.common.FeConstants; import org.apache.doris.common.UserException; import org.apache.doris.thrift.FrontendService; @@ -41,18 +42,21 @@ import java.util.concurrent.TimeUnit; public class TokenManager { private static final Logger LOG = LogManager.getLogger(TokenManager.class); - private final int thriftTimeoutMs = 300 * 1000; - private final EvictingQueue<String> tokenQueue; - private final ScheduledExecutorService tokenGenerator; + private int thriftTimeoutMs = 300 * 1000; + private EvictingQueue<String> tokenQueue; + private ScheduledExecutorService tokenGenerator; public TokenManager() { + } + + public void start() { this.tokenQueue = EvictingQueue.create(Config.token_queue_size); // init one token to avoid async issue. this.tokenQueue.offer(generateNewToken()); - this.tokenGenerator = Executors.newScheduledThreadPool(1); - this.tokenGenerator.scheduleAtFixedRate(() -> { - tokenQueue.offer(generateNewToken()); - }, 0, Config.token_generate_period_hour, TimeUnit.HOURS); + this.tokenGenerator = Executors.newScheduledThreadPool(1, + new CustomThreadFactory("token-generator")); + this.tokenGenerator.scheduleAtFixedRate(() -> tokenQueue.offer(generateNewToken()), 0, + Config.token_generate_period_hour, TimeUnit.HOURS); } private String generateNewToken() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java index 7295f40b602..b58f26b863a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java @@ -21,6 +21,7 @@ import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.MaterializedView; import org.apache.doris.catalog.TableIf.TableType; +import org.apache.doris.common.CustomThreadFactory; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.io.Text; @@ -66,9 +67,11 @@ public class MTMVJobManager { private final MTMVTaskManager taskManager; - private ScheduledExecutorService periodScheduler = Executors.newScheduledThreadPool(1); + private ScheduledExecutorService periodScheduler = Executors.newScheduledThreadPool(1, + new CustomThreadFactory("mtmv-job-period-scheduler")); - private ScheduledExecutorService cleanerScheduler = Executors.newScheduledThreadPool(1); + private ScheduledExecutorService cleanerScheduler = Executors.newScheduledThreadPool(1, + new CustomThreadFactory("mtmv-job-cleaner-scheduler")); private final ReentrantReadWriteLock rwLock; @@ -86,13 +89,15 @@ public class MTMVJobManager { // check the scheduler before using it // since it may be shutdown when master change to follower without process shutdown. if (periodScheduler.isShutdown()) { - periodScheduler = Executors.newScheduledThreadPool(1); + periodScheduler = Executors.newScheduledThreadPool(1, + new CustomThreadFactory("mtmv-job-period-scheduler")); } registerJobs(); if (cleanerScheduler.isShutdown()) { - cleanerScheduler = Executors.newScheduledThreadPool(1); + cleanerScheduler = Executors.newScheduledThreadPool(1, + new CustomThreadFactory("mtmv-job-cleaner-scheduler")); } cleanerScheduler.scheduleAtFixedRate(() -> { if (!Env.getCurrentEnv().isMaster()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java index d6e370480bb..138ede9e075 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java @@ -20,6 +20,7 @@ package org.apache.doris.mtmv; import org.apache.doris.catalog.Env; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; +import org.apache.doris.common.CustomThreadFactory; import org.apache.doris.mtmv.MTMVUtils.TaskState; import org.apache.doris.mtmv.metadata.MTMVJob; import org.apache.doris.mtmv.metadata.MTMVTask; @@ -65,13 +66,14 @@ public class MTMVTaskManager { // keep track of all the completed tasks private final Deque<MTMVTask> historyTasks = Queues.newLinkedBlockingDeque(); - private ScheduledExecutorService taskScheduler = Executors.newScheduledThreadPool(1); + private ScheduledExecutorService taskScheduler = Executors.newScheduledThreadPool(1, + new CustomThreadFactory("mtmv-task-scheduler")); private final AtomicInteger failedTaskCount = new AtomicInteger(0); public void startTaskScheduler() { if (taskScheduler.isShutdown()) { - taskScheduler = Executors.newScheduledThreadPool(1); + taskScheduler = Executors.newScheduledThreadPool(1, new CustomThreadFactory("mtmv-task-scheduler")); } taskScheduler.scheduleAtFixedRate(() -> { checkRunningTask(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/TokenManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/TokenManagerTest.java index b592b9df930..13ae9b6e44e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/TokenManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/TokenManagerTest.java @@ -34,6 +34,7 @@ public class TokenManagerTest { @Test public void testTokenCheck() throws UserException { TokenManager tokenManager = new TokenManager(); + tokenManager.start(); String token = tokenManager.acquireToken(); Assert.assertTrue(tokenManager.checkAuthToken(token)); } @@ -41,6 +42,7 @@ public class TokenManagerTest { @Test public void testSameToken() throws UserException { TokenManager tokenManager = new TokenManager(); + tokenManager.start(); String token1 = tokenManager.acquireToken(); String token2 = tokenManager.acquireToken(); Assert.assertNotNull(token1); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org