This is an automated email from the ASF dual-hosted git repository.

lijibing pushed a commit to branch high-priority-column
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 4fcb1ae518153b5d2ad95463588763c3c36dade6
Author: Jibing-Li <64681310+jibing...@users.noreply.github.com>
AuthorDate: Mon Apr 1 10:30:38 2024 +0800

    Use future to block auto analyze before job finish. (#33083)
---
 .../doris/statistics/AnalysisTaskExecutor.java     |  7 ++++---
 .../doris/statistics/StatisticsAutoCollector.java  | 23 +++++++++++++++++++---
 2 files changed, 24 insertions(+), 6 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
index 3bdccaca047..d787794534a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
@@ -27,6 +27,7 @@ import org.apache.logging.log4j.Logger;
 
 import java.util.Comparator;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -52,7 +53,7 @@ public class AnalysisTaskExecutor {
                     simultaneouslyRunningTaskNum,
                     simultaneouslyRunningTaskNum, 0,
                     TimeUnit.DAYS, new LinkedBlockingQueue<>(taskQueueSize),
-                    new BlockedPolicy("Analysis Job Executor", 
Integer.MAX_VALUE),
+                    new BlockedPolicy("Analysis Job Executor Block Policy", 
Integer.MAX_VALUE),
                     "Analysis Job Executor", true);
             cancelExpiredTask();
         } else {
@@ -88,9 +89,9 @@ public class AnalysisTaskExecutor {
         }
     }
 
-    public void submitTask(BaseAnalysisTask task) {
+    public Future<?> submitTask(BaseAnalysisTask task) {
         AnalysisTaskWrapper taskWrapper = new AnalysisTaskWrapper(this, task);
-        executors.submit(taskWrapper);
+        return executors.submit(taskWrapper);
     }
 
     public void putJob(AnalysisTaskWrapper wrapper) throws Exception {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
index 57f3f494573..bf0179f5603 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
@@ -45,6 +45,8 @@ import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.Set;
 import java.util.StringJoiner;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -134,7 +136,15 @@ public class StatisticsAutoCollector extends MasterDaemon {
         }
         AnalysisInfo analyzeJob = createAnalyzeJobForTbl(table, columns, 
priority);
         LOG.debug("Auto analyze job : {}", analyzeJob.toString());
-        executeSystemAnalysisJob(analyzeJob);
+        try {
+            executeSystemAnalysisJob(analyzeJob);
+        } catch (Exception e) {
+            StringJoiner stringJoiner = new StringJoiner(",", "[", "]");
+            for (Pair<String, String> pair : columns) {
+                stringJoiner.add(pair.toString());
+            }
+            LOG.warn("Fail to auto analyze table {}, columns [{}]", 
table.getName(), stringJoiner.toString());
+        }
     }
 
     protected void appendPartitionColumns(TableIf table, Set<Pair<String, 
String>> columns) throws DdlException {
@@ -205,7 +215,7 @@ public class StatisticsAutoCollector extends MasterDaemon {
     // Analysis job created by the system
     @VisibleForTesting
     protected void executeSystemAnalysisJob(AnalysisInfo jobInfo)
-            throws DdlException {
+            throws DdlException, ExecutionException, InterruptedException {
         Map<Long, BaseAnalysisTask> analysisTasks = new HashMap<>();
         AnalysisManager analysisManager = 
Env.getCurrentEnv().getAnalysisManager();
         analysisManager.createTaskForEachColumns(jobInfo, analysisTasks, 
false);
@@ -215,7 +225,14 @@ public class StatisticsAutoCollector extends MasterDaemon {
         }
         Env.getCurrentEnv().getAnalysisManager().constructJob(jobInfo, 
analysisTasks.values());
         Env.getCurrentEnv().getAnalysisManager().registerSysJob(jobInfo, 
analysisTasks);
-        analysisTasks.values().forEach(analysisTaskExecutor::submitTask);
+        Future<?>[] futures = new Future[analysisTasks.values().size()];
+        int i = 0;
+        for (BaseAnalysisTask task : analysisTasks.values()) {
+            futures[i++] = analysisTaskExecutor.submitTask(task);
+        }
+        for (Future future : futures) {
+            future.get();
+        }
     }
 
     protected AnalysisInfo getNeedAnalyzeColumns(AnalysisInfo jobInfo) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to