This is an automated email from the ASF dual-hosted git repository. lijibing pushed a commit to branch high-priority-column in repository https://gitbox.apache.org/repos/asf/doris.git
commit 4fcb1ae518153b5d2ad95463588763c3c36dade6 Author: Jibing-Li <64681310+jibing...@users.noreply.github.com> AuthorDate: Mon Apr 1 10:30:38 2024 +0800 Use future to block auto analyze before job finish. (#33083) --- .../doris/statistics/AnalysisTaskExecutor.java | 7 ++++--- .../doris/statistics/StatisticsAutoCollector.java | 23 +++++++++++++++++++--- 2 files changed, 24 insertions(+), 6 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java index 3bdccaca047..d787794534a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java @@ -27,6 +27,7 @@ import org.apache.logging.log4j.Logger; import java.util.Comparator; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -52,7 +53,7 @@ public class AnalysisTaskExecutor { simultaneouslyRunningTaskNum, simultaneouslyRunningTaskNum, 0, TimeUnit.DAYS, new LinkedBlockingQueue<>(taskQueueSize), - new BlockedPolicy("Analysis Job Executor", Integer.MAX_VALUE), + new BlockedPolicy("Analysis Job Executor Block Policy", Integer.MAX_VALUE), "Analysis Job Executor", true); cancelExpiredTask(); } else { @@ -88,9 +89,9 @@ public class AnalysisTaskExecutor { } } - public void submitTask(BaseAnalysisTask task) { + public Future<?> submitTask(BaseAnalysisTask task) { AnalysisTaskWrapper taskWrapper = new AnalysisTaskWrapper(this, task); - executors.submit(taskWrapper); + return executors.submit(taskWrapper); } public void putJob(AnalysisTaskWrapper wrapper) throws Exception { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java index 57f3f494573..bf0179f5603 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java @@ -45,6 +45,8 @@ import java.util.Map.Entry; import java.util.Optional; import java.util.Set; import java.util.StringJoiner; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -134,7 +136,15 @@ public class StatisticsAutoCollector extends MasterDaemon { } AnalysisInfo analyzeJob = createAnalyzeJobForTbl(table, columns, priority); LOG.debug("Auto analyze job : {}", analyzeJob.toString()); - executeSystemAnalysisJob(analyzeJob); + try { + executeSystemAnalysisJob(analyzeJob); + } catch (Exception e) { + StringJoiner stringJoiner = new StringJoiner(",", "[", "]"); + for (Pair<String, String> pair : columns) { + stringJoiner.add(pair.toString()); + } + LOG.warn("Fail to auto analyze table {}, columns [{}]", table.getName(), stringJoiner.toString()); + } } protected void appendPartitionColumns(TableIf table, Set<Pair<String, String>> columns) throws DdlException { @@ -205,7 +215,7 @@ public class StatisticsAutoCollector extends MasterDaemon { // Analysis job created by the system @VisibleForTesting protected void executeSystemAnalysisJob(AnalysisInfo jobInfo) - throws DdlException { + throws DdlException, ExecutionException, InterruptedException { Map<Long, BaseAnalysisTask> analysisTasks = new HashMap<>(); AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager(); analysisManager.createTaskForEachColumns(jobInfo, analysisTasks, false); @@ -215,7 +225,14 @@ public class StatisticsAutoCollector extends MasterDaemon { } Env.getCurrentEnv().getAnalysisManager().constructJob(jobInfo, analysisTasks.values()); Env.getCurrentEnv().getAnalysisManager().registerSysJob(jobInfo, analysisTasks); - analysisTasks.values().forEach(analysisTaskExecutor::submitTask); + Future<?>[] futures = new Future[analysisTasks.values().size()]; + int i = 0; + for (BaseAnalysisTask task : analysisTasks.values()) { + futures[i++] = analysisTaskExecutor.submitTask(task); + } + for (Future future : futures) { + future.get(); + } } protected AnalysisInfo getNeedAnalyzeColumns(AnalysisInfo jobInfo) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org