This is an automated email from the ASF dual-hosted git repository.

starocean999 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new afb6a57aa8 [enhancement](nereids) Improve stats preload performance 
(#21970)
afb6a57aa8 is described below

commit afb6a57aa8741ecc0a61754100d17703d5c9894c
Author: AKIRA <33112463+kikyou1...@users.noreply.github.com>
AuthorDate: Mon Jul 31 17:32:01 2023 +0800

    [enhancement](nereids) Improve stats preload performance (#21970)
---
 .../main/java/org/apache/doris/common/Config.java  |  13 ++-
 .../org/apache/doris/analysis/AnalyzeTblStmt.java  |   7 +-
 .../java/org/apache/doris/nereids/memo/Memo.java   |   2 +-
 .../apache/doris/statistics/AnalysisManager.java   |  17 ++--
 .../doris/statistics/AnalysisTaskExecutor.java     |  40 +++-----
 .../doris/statistics/AnalysisTaskScheduler.java    | 108 ---------------------
 .../apache/doris/statistics/BaseAnalysisTask.java  |   8 +-
 .../apache/doris/statistics/ColumnStatistic.java   |  14 +--
 .../apache/doris/statistics/OlapAnalysisTask.java  |  31 +++---
 .../doris/statistics/StatisticsAutoAnalyzer.java   |  25 +++--
 .../apache/doris/statistics/StatisticsCache.java   |  66 ++++++++-----
 .../doris/statistics/StatisticsRepository.java     |  15 +--
 .../doris/statistics/util/StatisticsUtil.java      |  17 +++-
 .../apache/doris/statistics/AnalysisJobTest.java   |  10 +-
 .../doris/statistics/AnalysisTaskExecutorTest.java |  28 ++----
 .../apache/doris/statistics/HistogramTaskTest.java |  14 +--
 regression-test/pipeline/p0/conf/fe.conf           |   2 +-
 17 files changed, 158 insertions(+), 259 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index a47ae31cdd..1b4c61a866 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1680,7 +1680,7 @@ public class Config extends ConfigBase {
      * Used to determined how many statistics collection SQL could run 
simultaneously.
      */
     @ConfField
-    public static int statistics_simultaneously_running_task_num = 10;
+    public static int statistics_simultaneously_running_task_num = 5;
 
     /**
      * if table has too many replicas, Fe occur oom when schema change.
@@ -2031,13 +2031,13 @@ public class Config extends ConfigBase {
     public static int hive_stats_partition_sample_size = 3000;
 
     @ConfField
-    public static boolean enable_full_auto_analyze = false;
+    public static boolean enable_full_auto_analyze = true;
 
     @ConfField
     public static String full_auto_analyze_start_time = "00:00:00";
 
     @ConfField
-    public static String full_auto_analyze_end_time = "23:59:59";
+    public static String full_auto_analyze_end_time = "02:00:00";
 
     @ConfField
     public static int statistics_sql_parallel_exec_instance_num = 1;
@@ -2056,4 +2056,11 @@ public class Config extends ConfigBase {
                     + "and modifying table properties. "
                     + "This config is recommended to be used only in the test 
environment"})
     public static int force_olap_table_replication_num = 0;
+
+    @ConfField
+    public static int full_auto_analyze_simultaneously_running_task_num = 1;
+
+    @ConfField
+    public static int cpu_resource_limit_per_analyze_task = 1;
+
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeTblStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeTblStmt.java
index da08f45bee..527f802748 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeTblStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeTblStmt.java
@@ -36,6 +36,7 @@ import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.statistics.AnalysisInfo.AnalysisType;
 import org.apache.doris.statistics.ColumnStatistic;
+import org.apache.doris.statistics.util.StatisticsUtil;
 
 import com.google.common.collect.Sets;
 import org.apache.commons.lang3.StringUtils;
@@ -192,10 +193,10 @@ public class AnalyzeTblStmt extends AnalyzeStmt {
             }
         }
         if (containsUnsupportedTytpe) {
-            if 
(!ConnectContext.get().getSessionVariable().enableAnalyzeComplexTypeColumn) {
+            if (ConnectContext.get() == null
+                    || 
!ConnectContext.get().getSessionVariable().enableAnalyzeComplexTypeColumn) {
                 columnNames = columnNames.stream()
-                        .filter(c -> 
!ColumnStatistic.UNSUPPORTED_TYPE.contains(
-                                table.getColumn(c).getType()))
+                        .filter(c -> 
!StatisticsUtil.isUnsupportedType(table.getColumn(c).getType()))
                         .collect(Collectors.toList());
             } else {
                 throw new AnalysisException(
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Memo.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Memo.java
index a313975684..18ea08d58e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Memo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Memo.java
@@ -115,7 +115,7 @@ public class Memo {
     public void removePhysicalExpression() {
         groupExpressions.entrySet().removeIf(entry -> 
entry.getValue().getPlan() instanceof PhysicalPlan);
 
-        Iterator<Entry<GroupId, Group>> iterator = 
groups.entrySet().iterator();
+        Iterator<Map.Entry<GroupId, Group>> iterator = 
groups.entrySet().iterator();
         while (iterator.hasNext()) {
             Map.Entry<GroupId, Group> entry = iterator.next();
             Group group = entry.getValue();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index 66f0b94aa8..63ab923234 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -94,8 +94,6 @@ import java.util.stream.Collectors;
 
 public class AnalysisManager extends Daemon implements Writable {
 
-    public AnalysisTaskScheduler taskScheduler;
-
     private static final Logger LOG = 
LogManager.getLogger(AnalysisManager.class);
 
     private ConcurrentMap<Long, Map<Long, BaseAnalysisTask>> 
analysisJobIdToTaskMap = new ConcurrentHashMap<>();
@@ -112,8 +110,7 @@ public class AnalysisManager extends Daemon implements 
Writable {
     public AnalysisManager() {
         
super(TimeUnit.SECONDS.toMillis(StatisticConstants.ANALYZE_MANAGER_INTERVAL_IN_SECS));
         if (!Env.isCheckpointThread()) {
-            this.taskScheduler = new AnalysisTaskScheduler();
-            this.taskExecutor = new AnalysisTaskExecutor(taskScheduler);
+            this.taskExecutor = new 
AnalysisTaskExecutor(Config.statistics_simultaneously_running_task_num);
             this.statisticsCache = new StatisticsCache();
             taskExecutor.start();
         }
@@ -192,7 +189,9 @@ public class AnalysisManager extends Daemon implements 
Writable {
                         table.getName());
                 // columnNames null means to add all visitable columns.
                 AnalyzeTblStmt analyzeTblStmt = new 
AnalyzeTblStmt(analyzeProperties, tableName,
-                        null, db.getId(), table);
+                        table.getBaseSchema().stream().filter(c -> 
!StatisticsUtil.isUnsupportedType(c.getType())).map(
+                                Column::getName).collect(
+                                Collectors.toList()), db.getId(), table);
                 try {
                     analyzeTblStmt.check();
                 } catch (AnalysisException analysisException) {
@@ -254,12 +253,13 @@ public class AnalysisManager extends Daemon implements 
Writable {
             return null;
         }
 
-        analysisTaskInfos.values().forEach(taskScheduler::schedule);
+        analysisTaskInfos.values().forEach(taskExecutor::submitTask);
         return jobInfo;
     }
 
     // Analysis job created by the system
-    public void createSystemAnalysisJob(AnalysisInfo info) throws DdlException 
{
+    public void createSystemAnalysisJob(AnalysisInfo info, 
AnalysisTaskExecutor analysisTaskExecutor)
+            throws DdlException {
         AnalysisInfo jobInfo = buildAnalysisJobInfo(info);
         if (jobInfo.colToPartitions.isEmpty()) {
             // No statistics need to be collected or updated
@@ -273,8 +273,7 @@ public class AnalysisManager extends Daemon implements 
Writable {
             persistAnalysisJob(jobInfo);
             analysisJobIdToTaskMap.put(jobInfo.jobId, analysisTaskInfos);
         }
-
-        analysisTaskInfos.values().forEach(taskScheduler::schedule);
+        analysisTaskInfos.values().forEach(taskExecutor::submitTask);
     }
 
     private void sendJobId(List<AnalysisInfo> analysisInfos, boolean proxy) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
index b5ec7aeb87..fb23050fff 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
@@ -35,26 +35,23 @@ public class AnalysisTaskExecutor extends Thread {
 
     private static final Logger LOG = 
LogManager.getLogger(AnalysisTaskExecutor.class);
 
-    private final ThreadPoolExecutor executors = 
ThreadPoolManager.newDaemonThreadPool(
-            Config.statistics_simultaneously_running_task_num,
-            Config.statistics_simultaneously_running_task_num, 0,
-            TimeUnit.DAYS, new LinkedBlockingQueue<>(),
-            new BlockedPolicy("Analysis Job Executor", Integer.MAX_VALUE),
-            "Analysis Job Executor", true);
-
-    private final AnalysisTaskScheduler taskScheduler;
+    private final ThreadPoolExecutor executors;
 
     private final BlockingQueue<AnalysisTaskWrapper> taskQueue =
             new PriorityBlockingQueue<AnalysisTaskWrapper>(20,
                     
Comparator.comparingLong(AnalysisTaskWrapper::getStartTime));
 
-    public AnalysisTaskExecutor(AnalysisTaskScheduler jobExecutor) {
-        this.taskScheduler = jobExecutor;
+    public AnalysisTaskExecutor(int simultaneouslyRunningTaskNum) {
+        executors = ThreadPoolManager.newDaemonThreadPool(
+                simultaneouslyRunningTaskNum,
+                simultaneouslyRunningTaskNum, 0,
+                TimeUnit.DAYS, new LinkedBlockingQueue<>(),
+                new BlockedPolicy("Analysis Job Executor", Integer.MAX_VALUE),
+                "Analysis Job Executor", true);
     }
 
     @Override
     public void run() {
-        fetchAndExecute();
         cancelExpiredTask();
     }
 
@@ -82,22 +79,7 @@ public class AnalysisTaskExecutor extends Thread {
         }
     }
 
-    public void fetchAndExecute() {
-        Thread t = new Thread(() -> {
-            for (;;) {
-                try {
-                    doFetchAndExecute();
-                } catch (Throwable throwable) {
-                    LOG.warn(throwable);
-                }
-            }
-        }, "Analysis Task Submitter");
-        t.setDaemon(true);
-        t.start();
-    }
-
-    private void doFetchAndExecute() {
-        BaseAnalysisTask task = taskScheduler.getPendingTasks();
+    public void submitTask(BaseAnalysisTask task) {
         AnalysisTaskWrapper taskWrapper = new AnalysisTaskWrapper(this, task);
         executors.submit(taskWrapper);
     }
@@ -105,4 +87,8 @@ public class AnalysisTaskExecutor extends Thread {
     public void putJob(AnalysisTaskWrapper wrapper) throws Exception {
         taskQueue.put(wrapper);
     }
+
+    public boolean idle() {
+        return executors.getQueue().isEmpty();
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskScheduler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskScheduler.java
deleted file mode 100644
index 5c9de2b58b..0000000000
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskScheduler.java
+++ /dev/null
@@ -1,108 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.statistics;
-
-import org.apache.doris.catalog.Env;
-
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.util.Comparator;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.PriorityQueue;
-import java.util.Queue;
-import java.util.Set;
-
-public class AnalysisTaskScheduler {
-
-    private static final Logger LOG = 
LogManager.getLogger(AnalysisTaskScheduler.class);
-
-    private final PriorityQueue<BaseAnalysisTask> systemJobQueue =
-            new 
PriorityQueue<>(Comparator.comparingLong(BaseAnalysisTask::getLastExecTime));
-
-    private final Queue<BaseAnalysisTask> manualJobQueue = new LinkedList<>();
-
-    private final Set<BaseAnalysisTask> systemJobSet = new HashSet<>();
-
-    private final Set<BaseAnalysisTask> manualJobSet = new HashSet<>();
-
-    public synchronized void schedule(BaseAnalysisTask analysisTask) {
-        try {
-
-            switch (analysisTask.info.jobType) {
-                case MANUAL:
-                    addToManualJobQueue(analysisTask);
-                    break;
-                case SYSTEM:
-                    addToSystemQueue(analysisTask);
-                    break;
-                default:
-                    throw new IllegalArgumentException("Unknown job type: " + 
analysisTask.info.jobType);
-            }
-        } catch (Throwable t) {
-            Env.getCurrentEnv().getAnalysisManager().updateTaskStatus(
-                    analysisTask.info, AnalysisState.FAILED, t.getMessage(), 
System.currentTimeMillis());
-        }
-    }
-
-    // Make sure invoker of this method is synchronized on object.
-
-    private void addToSystemQueue(BaseAnalysisTask analysisJobInfo) {
-        if (systemJobSet.contains(analysisJobInfo)) {
-            return;
-        }
-        systemJobSet.add(analysisJobInfo);
-        systemJobQueue.add(analysisJobInfo);
-        notify();
-    }
-
-    // Make sure invoker of this method is synchronized on object.
-    private void addToManualJobQueue(BaseAnalysisTask analysisJobInfo) {
-        if (manualJobSet.contains(analysisJobInfo)) {
-            return;
-        }
-        manualJobSet.add(analysisJobInfo);
-        manualJobQueue.add(analysisJobInfo);
-        notify();
-    }
-
-    public synchronized BaseAnalysisTask getPendingTasks() {
-        while (true) {
-            if (!manualJobQueue.isEmpty()) {
-                return pollAndRemove(manualJobQueue, manualJobSet);
-            }
-            if (!systemJobQueue.isEmpty()) {
-                return pollAndRemove(systemJobQueue, systemJobSet);
-            }
-            try {
-                wait();
-            } catch (Exception e) {
-                LOG.warn("Thread get interrupted when waiting for pending 
jobs", e);
-                return null;
-            }
-        }
-    }
-
-    // Poll from queue, remove from set. Make sure invoker of this method is 
synchronized on object.
-    private BaseAnalysisTask pollAndRemove(Queue<BaseAnalysisTask> q, 
Set<BaseAnalysisTask> s) {
-        BaseAnalysisTask t = q.poll();
-        s.remove(t);
-        return t;
-    }
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
index e146a2e8e3..fc264e0661 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
@@ -26,8 +26,10 @@ import org.apache.doris.datasource.CatalogIf;
 import org.apache.doris.qe.StmtExecutor;
 import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod;
 import org.apache.doris.statistics.AnalysisInfo.AnalysisType;
+import org.apache.doris.statistics.util.StatisticsUtil;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -152,10 +154,8 @@ public abstract class BaseAnalysisTask {
             if (col == null) {
                 throw new RuntimeException(String.format("Column with name %s 
not exists", info.tblName));
             }
-            if (isUnsupportedType(col.getType().getPrimitiveType())) {
-                throw new RuntimeException(String.format("Column with type %s 
is not supported",
-                        col.getType().toString()));
-            }
+            
Preconditions.checkArgument(!StatisticsUtil.isUnsupportedType(col.getType()),
+                    String.format("Column with type %s is not supported", 
col.getType().toString()));
         }
 
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java
index 6887108a68..7e23c83e07 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java
@@ -19,9 +19,7 @@ package org.apache.doris.statistics;
 
 import org.apache.doris.analysis.LiteralExpr;
 import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.Type;
-import org.apache.doris.common.DdlException;
 import org.apache.doris.statistics.util.InternalQueryResult.ResultRow;
 import org.apache.doris.statistics.util.StatisticsUtil;
 
@@ -206,9 +204,6 @@ public class ColumnStatistic {
                 columnStatisticBuilder.setMaxValue(Double.MAX_VALUE);
             }
             columnStatisticBuilder.setSelectivity(1.0);
-            Histogram histogram = 
Env.getCurrentEnv().getStatisticsCache().getHistogram(tblId, idxId, colName)
-                    .orElse(null);
-            columnStatisticBuilder.setHistogram(histogram);
             
columnStatisticBuilder.setUpdatedTime(resultRow.getColumnValue("update_time"));
             return columnStatisticBuilder.build();
         } catch (Exception e) {
@@ -428,12 +423,7 @@ public class ColumnStatistic {
         return isUnKnown;
     }
 
-    public void loadPartitionStats(long tableId, long idxId, String colName) 
throws DdlException {
-        List<ResultRow> resultRows = 
StatisticsRepository.loadPartStats(tableId, idxId, colName);
-        for (ResultRow resultRow : resultRows) {
-            String partId = resultRow.getColumnValue("part_id");
-            ColumnStatistic columnStatistic = 
ColumnStatistic.fromResultRow(resultRow);
-            partitionIdToColStats.put(Long.parseLong(partId), columnStatistic);
-        }
+    public void putPartStats(long partId, ColumnStatistic columnStatistic) {
+        this.partitionIdToColStats.put(partId, columnStatistic);
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java
index a980385bde..3f3a04d620 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java
@@ -110,21 +110,24 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
 
     @VisibleForTesting
     public void execSQL(String sql) throws Exception {
-        if (killed) {
-            return;
-        }
-        long startTime = System.currentTimeMillis();
-        try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) 
{
-            r.connectContext.getSessionVariable().disableNereidsPlannerOnce();
-            stmtExecutor = new StmtExecutor(r.connectContext, sql);
-            r.connectContext.setExecutor(stmtExecutor);
-            stmtExecutor.execute();
-            QueryState queryState = r.connectContext.getState();
-            if (queryState.getStateType().equals(MysqlStateType.ERR)) {
-                throw new RuntimeException(String.format("Failed to analyze 
%s.%s.%s, error: %s sql: %s",
-                        info.catalogName, info.dbName, info.colName, sql, 
queryState.getErrorMessage()));
+        synchronized (OlapAnalysisTask.class) {
+            if (killed) {
+                return;
+            }
+            long startTime = System.currentTimeMillis();
+            try (AutoCloseConnectContext r = 
StatisticsUtil.buildConnectContext()) {
+                
r.connectContext.getSessionVariable().disableNereidsPlannerOnce();
+                stmtExecutor = new StmtExecutor(r.connectContext, sql);
+                r.connectContext.setExecutor(stmtExecutor);
+                stmtExecutor.execute();
+                QueryState queryState = r.connectContext.getState();
+                if (queryState.getStateType().equals(MysqlStateType.ERR)) {
+                    throw new RuntimeException(String.format("Failed to 
analyze %s.%s.%s, error: %s sql: %s",
+                            info.catalogName, info.dbName, info.colName, sql, 
queryState.getErrorMessage()));
+                }
+                LOG.info("Analyze SQL: " + sql + " cost time: " + 
(System.currentTimeMillis() - startTime) + "ms");
             }
-            LOG.info("Analyze SQL: " + sql + " cost time: " + 
(System.currentTimeMillis() - startTime) + "ms");
         }
+
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoAnalyzer.java
 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoAnalyzer.java
index 181af16882..02fe96ec71 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoAnalyzer.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoAnalyzer.java
@@ -50,8 +50,11 @@ public class StatisticsAutoAnalyzer extends MasterDaemon {
 
     private static final Logger LOG = 
LogManager.getLogger(StatisticsAutoAnalyzer.class);
 
+    private AnalysisTaskExecutor analysisTaskExecutor;
+
     public StatisticsAutoAnalyzer() {
         super("Automatic Analyzer", 
TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes));
+        analysisTaskExecutor = new 
AnalysisTaskExecutor(Config.full_auto_analyze_simultaneously_running_task_num);
     }
 
     @Override
@@ -66,12 +69,16 @@ public class StatisticsAutoAnalyzer extends MasterDaemon {
             return;
         }
 
-        // if (!Config.enable_full_auto_analyze) {
-        //     analyzePeriodically();
-        //     analyzeAutomatically();
-        // } else {
-        //     analyzeAll();
-        // }
+        if (!analysisTaskExecutor.idle()) {
+            return;
+        }
+
+        if (!Config.enable_full_auto_analyze) {
+            analyzePeriodically();
+            analyzeAutomatically();
+        } else {
+            analyzeAll();
+        }
     }
 
     @SuppressWarnings({"rawtypes", "unchecked"})
@@ -92,7 +99,7 @@ public class StatisticsAutoAnalyzer extends MasterDaemon {
                         if (analysisInfo == null) {
                             continue;
                         }
-                        analysisManager.createSystemAnalysisJob(analysisInfo);
+                        analysisManager.createSystemAnalysisJob(analysisInfo, 
analysisTaskExecutor);
                     }
                 }
             } catch (Throwable t) {
@@ -109,7 +116,7 @@ public class StatisticsAutoAnalyzer extends MasterDaemon {
             List<AnalysisInfo> jobInfos = analysisManager.findPeriodicJobs();
             for (AnalysisInfo jobInfo : jobInfos) {
                 jobInfo = new 
AnalysisInfoBuilder(jobInfo).setJobType(JobType.SYSTEM).build();
-                analysisManager.createSystemAnalysisJob(jobInfo);
+                analysisManager.createSystemAnalysisJob(jobInfo, 
analysisTaskExecutor);
             }
         } catch (DdlException e) {
             LOG.warn("Failed to periodically analyze the statistics." + e);
@@ -124,7 +131,7 @@ public class StatisticsAutoAnalyzer extends MasterDaemon {
             try {
                 checkedJobInfo = getReAnalyzeRequiredPart(jobInfo);
                 if (checkedJobInfo != null) {
-                    analysisManager.createSystemAnalysisJob(checkedJobInfo);
+                    analysisManager.createSystemAnalysisJob(checkedJobInfo, 
analysisTaskExecutor);
                 }
             } catch (Throwable t) {
                 LOG.warn("Failed to create analyze job: {}", checkedJobInfo, 
t);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java
index 1149ecdd5a..d5a3b972f3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java
@@ -38,10 +38,13 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.time.Duration;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
@@ -213,6 +216,7 @@ public class StatisticsCache {
         if (CollectionUtils.isEmpty(recentStatsUpdatedCols)) {
             return;
         }
+        Map<StatisticsCacheKey, ColumnStatistic> keyToColStats = new 
HashMap<>();
         for (ResultRow r : recentStatsUpdatedCols) {
             try {
                 long tblId = Long.parseLong(r.getColumnValue("tbl_id"));
@@ -221,12 +225,17 @@ public class StatisticsCache {
                 final StatisticsCacheKey k =
                         new StatisticsCacheKey(tblId, idxId, colId);
                 final ColumnStatistic c = ColumnStatistic.fromResultRow(r);
-                c.loadPartitionStats(tblId, idxId, colId);
+                keyToColStats.put(k, c);
                 putCache(k, c);
             } catch (Throwable t) {
                 LOG.warn("Error when preheating stats cache", t);
             }
         }
+        try {
+            loadPartStats(keyToColStats);
+        } catch (Exception e) {
+            LOG.warn("Fucka", e);
+        }
     }
 
     public void syncLoadColStats(long tableId, long idxId, String colName) {
@@ -261,32 +270,43 @@ public class StatisticsCache {
     }
 
     public void putCache(StatisticsCacheKey k, ColumnStatistic c) {
-        CompletableFuture<Optional<ColumnStatistic>> f = new 
CompletableFuture<Optional<ColumnStatistic>>() {
-
-            @Override
-            public Optional<ColumnStatistic> get() throws 
InterruptedException, ExecutionException {
-                return Optional.of(c);
-            }
-
-            @Override
-            public boolean isDone() {
-                return true;
-            }
+        CompletableFuture<Optional<ColumnStatistic>> f = new 
CompletableFuture<Optional<ColumnStatistic>>();
+        f.obtrudeValue(Optional.of(c));
+        columnStatisticsCache.put(k, f);
+    }
 
-            @Override
-            public boolean complete(Optional<ColumnStatistic> value) {
-                return true;
+    private void loadPartStats(Map<StatisticsCacheKey, ColumnStatistic> 
keyToColStats) {
+        final int batchSize = Config.expr_children_limit;
+        Set<StatisticsCacheKey> keySet = new HashSet<>();
+        for (StatisticsCacheKey statisticsCacheKey : keyToColStats.keySet()) {
+            if (keySet.size() < batchSize - 1) {
+                keySet.add(statisticsCacheKey);
+            } else {
+                List<ResultRow> partStats = 
StatisticsRepository.loadPartStats(keySet);
+                addPartStatsToColStats(keyToColStats, partStats);
+                keySet = new HashSet<>();
             }
+        }
+        if (!keySet.isEmpty()) {
+            List<ResultRow> partStats = 
StatisticsRepository.loadPartStats(keySet);
+            addPartStatsToColStats(keyToColStats, partStats);
+        }
+    }
 
-            @Override
-            public Optional<ColumnStatistic> join() {
-                return Optional.of(c);
+    private void addPartStatsToColStats(Map<StatisticsCacheKey, 
ColumnStatistic> keyToColStats,
+            List<ResultRow> partsStats) {
+        for (ResultRow r : partsStats) {
+            try {
+                long tblId = Long.parseLong(r.getColumnValue("tbl_id"));
+                long idxId = Long.parseLong(r.getColumnValue("idx_id"));
+                long partId = Long.parseLong(r.getColumnValue("part_id"));
+                String colId = r.getColumnValue("col_id");
+                ColumnStatistic partStats = ColumnStatistic.fromResultRow(r);
+                keyToColStats.get(new StatisticsCacheKey(tblId, idxId, 
colId)).putPartStats(partId, partStats);
+            } catch (Throwable t) {
+                LOG.warn("Failed to deserialized part stats", t);
             }
-        };
-        if (c.isUnKnown) {
-            return;
         }
-        columnStatisticsCache.put(k, f);
     }
 
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java
 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java
index d20bb358c1..7a043e7708 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java
@@ -38,6 +38,7 @@ import org.apache.commons.text.StringSubstitutor;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -126,7 +127,7 @@ public class StatisticsRepository {
 
     private static final String QUERY_PARTITION_STATISTICS = "SELECT * FROM " 
+ FeConstants.INTERNAL_DB_NAME
             + "." + StatisticConstants.STATISTIC_TBL_NAME + " WHERE "
-            + " tbl_id=${tblId} AND idx_id=${idxId} AND col_id='${colId}' "
+            + " ${inPredicate}"
             + " AND part_id IS NOT NULL";
 
     public static ColumnStatistic queryColumnStatisticsByName(long tableId, 
String colName) {
@@ -440,12 +441,14 @@ public class StatisticsRepository {
                 .replace(QUERY_COLUMN_STATISTICS));
     }
 
-    public static List<ResultRow> loadPartStats(long tableId, long idxId, 
String colName) {
+    public static List<ResultRow> loadPartStats(Collection<StatisticsCacheKey> 
keys) {
+        String inPredicate = "CONCAT(tbl_id, '-', idx_id, '-', col_id) in 
(%s)";
+        StringJoiner sj = new StringJoiner(",");
+        for (StatisticsCacheKey statisticsCacheKey : keys) {
+            sj.add("'" + statisticsCacheKey.toString() + "'");
+        }
         Map<String, String> params = new HashMap<>();
-        params.put("tblId", String.valueOf(tableId));
-        params.put("idxId", String.valueOf(idxId));
-        params.put("colId", colName);
-
+        params.put("inPredicate", String.format(inPredicate, sj.toString()));
         return StatisticsUtil.execStatisticQuery(new StringSubstitutor(params)
                 .replace(QUERY_PARTITION_STATISTICS));
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
index 0451097967..e82a8b955c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
@@ -29,17 +29,21 @@ import org.apache.doris.analysis.StatementBase;
 import org.apache.doris.analysis.StringLiteral;
 import org.apache.doris.analysis.TableName;
 import org.apache.doris.analysis.UserIdentity;
+import org.apache.doris.catalog.ArrayType;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.DatabaseIf;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.ListPartitionItem;
+import org.apache.doris.catalog.MapType;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Partition;
 import org.apache.doris.catalog.PartitionItem;
 import org.apache.doris.catalog.PrimitiveType;
 import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.catalog.StructType;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.catalog.Type;
+import org.apache.doris.catalog.VariantType;
 import org.apache.doris.catalog.external.HMSExternalTable;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
@@ -168,6 +172,7 @@ public class StatisticsUtil {
         SessionVariable sessionVariable = connectContext.getSessionVariable();
         sessionVariable.internalSession = true;
         
sessionVariable.setMaxExecMemByte(Config.statistics_sql_mem_limit_in_bytes);
+        sessionVariable.cpuResourceLimit = 
Config.cpu_resource_limit_per_analyze_task;
         sessionVariable.setEnableInsertStrict(true);
         sessionVariable.parallelExecInstanceNum = 
Config.statistics_sql_parallel_exec_instance_num;
         sessionVariable.parallelPipelineTaskNum = 
Config.statistics_sql_parallel_exec_instance_num;
@@ -633,7 +638,7 @@ public class StatisticsUtil {
     }
 
     private static void processDataFile(DataFile dataFile, PartitionSpec 
partitionSpec,
-                                        String colName, ColumnStatisticBuilder 
columnStatisticBuilder) {
+            String colName, ColumnStatisticBuilder columnStatisticBuilder) {
         int colId = -1;
         for (Types.NestedField column : partitionSpec.schema().columns()) {
             if (column.name().equals(colName)) {
@@ -651,4 +656,14 @@ public class StatisticsUtil {
         columnStatisticBuilder.setNumNulls(columnStatisticBuilder.getNumNulls()
                 + dataFile.nullValueCounts().get(colId));
     }
+
+    public static boolean isUnsupportedType(Type type) {
+        if (ColumnStatistic.UNSUPPORTED_TYPE.contains(type)) {
+            return true;
+        }
+        return type instanceof ArrayType
+                || type instanceof StructType
+                || type instanceof MapType
+                || type instanceof VariantType;
+    }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java
index ca145f07f2..fa0c0fc8ef 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java
@@ -62,13 +62,7 @@ public class AnalysisJobTest extends TestWithFeService {
     }
 
     @Test
-    public void testCreateAnalysisJob(@Mocked AnalysisTaskScheduler scheduler) 
throws Exception {
-        new Expectations() {
-            {
-                scheduler.schedule((BaseAnalysisTask) any);
-                times = 3;
-            }
-        };
+    public void testCreateAnalysisJob() throws Exception {
 
         new MockUp<StatisticsUtil>() {
 
@@ -101,7 +95,7 @@ public class AnalysisJobTest extends TestWithFeService {
     }
 
     @Test
-    public void testJobExecution(@Mocked AnalysisTaskScheduler scheduler, 
@Mocked StmtExecutor stmtExecutor)
+    public void testJobExecution(@Mocked StmtExecutor stmtExecutor)
             throws Exception {
         new MockUp<StatisticsUtil>() {
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java
index 8da819f09c..453eb78628 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java
@@ -28,9 +28,9 @@ import 
org.apache.doris.statistics.util.InternalQueryResult.ResultRow;
 import org.apache.doris.utframe.TestWithFeService;
 
 import com.google.common.collect.Maps;
+import mockit.Expectations;
 import mockit.Mock;
 import mockit.MockUp;
-import mockit.Mocked;
 import org.junit.jupiter.api.Test;
 
 import java.util.Collections;
@@ -41,8 +41,6 @@ import java.util.concurrent.BlockingQueue;
 
 public class AnalysisTaskExecutorTest extends TestWithFeService {
 
-    @Mocked
-    AnalysisTaskScheduler analysisTaskScheduler;
 
     @Override
     protected void runBeforeAll() throws Exception {
@@ -71,13 +69,7 @@ public class AnalysisTaskExecutorTest extends 
TestWithFeService {
                 .build();
         OlapAnalysisTask analysisJob = new OlapAnalysisTask(analysisJobInfo);
 
-        new MockUp<AnalysisTaskScheduler>() {
-            public synchronized BaseAnalysisTask getPendingTasks() {
-                return analysisJob;
-            }
-        };
-
-        AnalysisTaskExecutor analysisTaskExecutor = new 
AnalysisTaskExecutor(analysisTaskScheduler);
+        AnalysisTaskExecutor analysisTaskExecutor = new 
AnalysisTaskExecutor(1);
         BlockingQueue<AnalysisTaskWrapper> b = 
Deencapsulation.getField(analysisTaskExecutor, "taskQueue");
         AnalysisTaskWrapper analysisTaskWrapper = new 
AnalysisTaskWrapper(analysisTaskExecutor, analysisJob);
         Deencapsulation.setField(analysisTaskWrapper, "startTime", 5);
@@ -102,7 +94,7 @@ public class AnalysisTaskExecutorTest extends 
TestWithFeService {
             }
         };
 
-        AnalysisTaskExecutor analysisTaskExecutor = new 
AnalysisTaskExecutor(analysisTaskScheduler);
+        AnalysisTaskExecutor analysisTaskExecutor = new 
AnalysisTaskExecutor(1);
         HashMap<String, Set<String>> colToPartitions = Maps.newHashMap();
         colToPartitions.put("col1", Collections.singleton("t1"));
         AnalysisInfo analysisInfo = new 
AnalysisInfoBuilder().setJobId(0).setTaskId(0)
@@ -114,16 +106,16 @@ public class AnalysisTaskExecutorTest extends 
TestWithFeService {
                 .setColToPartitions(colToPartitions)
                 .build();
         OlapAnalysisTask task = new OlapAnalysisTask(analysisInfo);
-        new MockUp<AnalysisTaskScheduler>() {
-            @Mock
-            public synchronized BaseAnalysisTask getPendingTasks() {
-                return task;
-            }
-        };
+
         new MockUp<AnalysisManager>() {
             @Mock
             public void updateTaskStatus(AnalysisInfo info, AnalysisState 
jobState, String message, long time) {}
         };
-        Deencapsulation.invoke(analysisTaskExecutor, "doFetchAndExecute");
+        new Expectations() {
+            {
+                task.doExecute();
+            }
+        };
+        Deencapsulation.invoke(analysisTaskExecutor, "submitTask", task);
     }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/statistics/HistogramTaskTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/statistics/HistogramTaskTest.java
index d3d5245a81..0660c994a1 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/statistics/HistogramTaskTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/statistics/HistogramTaskTest.java
@@ -30,7 +30,6 @@ import org.apache.doris.utframe.TestWithFeService;
 
 import mockit.Mock;
 import mockit.MockUp;
-import mockit.Mocked;
 import org.junit.FixMethodOrder;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -43,9 +42,6 @@ import java.util.concurrent.ConcurrentMap;
 @FixMethodOrder(value = MethodSorters.NAME_ASCENDING)
 public class HistogramTaskTest extends TestWithFeService {
 
-    @Mocked
-    AnalysisTaskScheduler analysisTaskScheduler;
-
     @Override
     protected void runBeforeAll() throws Exception {
         createDatabase("histogram_task_test");
@@ -96,7 +92,7 @@ public class HistogramTaskTest extends TestWithFeService {
 
     @Test
     public void test2TaskExecution() throws Exception {
-        AnalysisTaskExecutor analysisTaskExecutor = new 
AnalysisTaskExecutor(analysisTaskScheduler);
+        AnalysisTaskExecutor analysisTaskExecutor = new 
AnalysisTaskExecutor(1);
         AnalysisInfo analysisInfo = new AnalysisInfoBuilder()
                 .setJobId(0).setTaskId(0).setCatalogName("internal")
                 .setDbName(SystemInfoService.DEFAULT_CLUSTER + ":" + 
"histogram_task_test").setTblName("t1")
@@ -107,17 +103,11 @@ public class HistogramTaskTest extends TestWithFeService {
                 .build();
         HistogramTask task = new HistogramTask(analysisInfo);
 
-        new MockUp<AnalysisTaskScheduler>() {
-            @Mock
-            public synchronized BaseAnalysisTask getPendingTasks() {
-                return task;
-            }
-        };
         new MockUp<AnalysisManager>() {
             @Mock
             public void updateTaskStatus(AnalysisInfo info, AnalysisState 
jobState, String message, long time) {}
         };
 
-        Deencapsulation.invoke(analysisTaskExecutor, "doFetchAndExecute");
+        Deencapsulation.invoke(analysisTaskExecutor, "submitTask", task);
     }
 }
diff --git a/regression-test/pipeline/p0/conf/fe.conf 
b/regression-test/pipeline/p0/conf/fe.conf
index 592bab5556..772ad23a9d 100644
--- a/regression-test/pipeline/p0/conf/fe.conf
+++ b/regression-test/pipeline/p0/conf/fe.conf
@@ -83,4 +83,4 @@ enable_mtmv = true
 
 dynamic_partition_check_interval_seconds=3
 
-enable_full_auto_analyze=false
+enable_full_auto_analyze=true


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to