This is an automated email from the ASF dual-hosted git repository.

lijibing pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 36695e871ad [feature](statistics)Support auto analyze columns that 
haven't been analyzed for a long time. #42399 (#45250)
36695e871ad is described below

commit 36695e871ad64d765530f2ca54bd94923800fb75
Author: James <lijib...@selectdb.com>
AuthorDate: Thu Dec 12 01:57:44 2024 +0800

    [feature](statistics)Support auto analyze columns that haven't been 
analyzed for a long time. #42399 (#45250)
    
    backport: https://github.com/apache/doris/pull/42399
---
 .../main/java/org/apache/doris/common/Config.java  |   7 +
 .../org/apache/doris/analysis/ShowAnalyzeStmt.java |   1 +
 .../apache/doris/analysis/ShowColumnStatsStmt.java |   2 +
 .../apache/doris/analysis/ShowTableStatsStmt.java  |  11 +-
 .../main/java/org/apache/doris/catalog/Env.java    |  22 +-
 .../org/apache/doris/datasource/ExternalTable.java |   2 +-
 .../java/org/apache/doris/qe/ShowExecutor.java     |   1 +
 .../java/org/apache/doris/qe/StmtExecutor.java     |   2 +-
 .../org/apache/doris/statistics/AnalysisInfo.java  |  13 +-
 .../doris/statistics/AnalysisInfoBuilder.java      |  17 +-
 .../apache/doris/statistics/AnalysisManager.java   |  27 +-
 .../doris/statistics/AnalysisTaskExecutor.java     |   5 +-
 .../org/apache/doris/statistics/ColStatsMeta.java  |   6 +-
 .../org/apache/doris/statistics/JobPriority.java   |  25 +
 .../doris/statistics/StatisticsAutoCollector.java  | 285 ++++++------
 .../doris/statistics/StatisticsCollector.java      |  79 ----
 .../doris/statistics/StatisticsJobAppender.java    | 152 +++++++
 .../apache/doris/statistics/TableStatsMeta.java    |  16 +-
 .../doris/statistics/util/StatisticsUtil.java      |  35 ++
 .../statistics/StatisticsAutoCollectorTest.java    | 502 ++-------------------
 .../statistics/StatisticsJobAppenderTest.java      |  84 ++++
 .../doris/statistics/util/StatisticsUtilTest.java  |  85 ++++
 .../hive/test_hive_statistic_auto.groovy           |  12 +-
 .../suites/statistics/analyze_stats.groovy         |  35 +-
 24 files changed, 700 insertions(+), 726 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 359d193a5ba..ec09c850805 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2764,6 +2764,13 @@ public class Config extends ConfigBase {
     public static boolean enable_proxy_protocol = false;
     public static int profile_async_collect_expire_time_secs = 5;
 
+    @ConfField(mutable = true, description = {
+            "内表自动收集时间间隔,当某一列上次收集时间距离当前时间大于该值,则会触发一次新的收集,0表示不会触发。",
+            "Columns that have not been collected within the specified 
interval will trigger automatic analyze. "
+                    + "0 means not trigger."
+    })
+    public static long auto_analyze_interval_seconds = 86400;
+
 
     
//==========================================================================
     //                    begin of cloud config
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowAnalyzeStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowAnalyzeStmt.java
index 9ccfd956ca5..f660d6eeb3c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowAnalyzeStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowAnalyzeStmt.java
@@ -62,6 +62,7 @@ public class ShowAnalyzeStmt extends ShowStmt {
             .add("schedule_type")
             .add("start_time")
             .add("end_time")
+            .add("priority")
             .build();
 
     private long jobId;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowColumnStatsStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowColumnStatsStmt.java
index 18bb916b8bd..36986dc9d4e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowColumnStatsStmt.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowColumnStatsStmt.java
@@ -62,6 +62,7 @@ public class ShowColumnStatsStmt extends ShowStmt {
                     .add("trigger")
                     .add("query_times")
                     .add("updated_time")
+                    .add("last_analyze_version")
                     .build();
 
     private final TableName tableName;
@@ -162,6 +163,7 @@ public class ShowColumnStatsStmt extends ShowStmt {
             row.add(String.valueOf(colStatsMeta == null ? "N/A" : 
colStatsMeta.jobType));
             row.add(String.valueOf(colStatsMeta == null ? "N/A" : 
colStatsMeta.queriedTimes));
             row.add(String.valueOf(p.second.updatedTime));
+            row.add(String.valueOf(colStatsMeta == null ? "N/A" : 
colStatsMeta.tableVersion));
             result.add(row);
         });
         return new ShowResultSet(getMetaData(), result);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTableStatsStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTableStatsStmt.java
index 91b8bf1de2d..915b5f19e03 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTableStatsStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTableStatsStmt.java
@@ -57,6 +57,7 @@ public class ShowTableStatsStmt extends ShowStmt {
                     .add("trigger")
                     .add("new_partition")
                     .add("user_inject")
+                    .add("last_analyze_time")
                     .build();
 
     private static final ImmutableList<String> INDEX_TITLE_NAMES =
@@ -192,6 +193,7 @@ public class ShowTableStatsStmt extends ShowStmt {
             row.add("");
             row.add("");
             row.add("");
+            row.add("");
             result.add(row);
             return new ShowResultSet(getMetaData(), result);
         }
@@ -201,15 +203,18 @@ public class ShowTableStatsStmt extends ShowStmt {
         row.add(String.valueOf(tableStatistic.updatedRows));
         row.add(String.valueOf(tableStatistic.queriedTimes.get()));
         row.add(String.valueOf(tableStatistic.rowCount));
-        LocalDateTime dateTime =
+        LocalDateTime tableUpdateTime =
                 
LocalDateTime.ofInstant(Instant.ofEpochMilli(tableStatistic.updatedTime),
                 java.time.ZoneId.systemDefault());
-        String formattedDateTime = dateTime.format(formatter);
-        row.add(formattedDateTime);
+        LocalDateTime lastAnalyzeTime =
+                
LocalDateTime.ofInstant(Instant.ofEpochMilli(tableStatistic.lastAnalyzeTime),
+                        java.time.ZoneId.systemDefault());
+        row.add(tableUpdateTime.format(formatter));
         row.add(tableStatistic.analyzeColumns().toString());
         row.add(tableStatistic.jobType.toString());
         row.add(String.valueOf(tableStatistic.newPartitionLoaded.get()));
         row.add(String.valueOf(tableStatistic.userInjected));
+        row.add(lastAnalyzeTime.format(formatter));
         result.add(row);
         return new ShowResultSet(getMetaData(), result);
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index cf0885e6ec2..321d2f53fc7 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -258,6 +258,7 @@ import org.apache.doris.statistics.AnalysisManager;
 import org.apache.doris.statistics.StatisticsAutoCollector;
 import org.apache.doris.statistics.StatisticsCache;
 import org.apache.doris.statistics.StatisticsCleaner;
+import org.apache.doris.statistics.StatisticsJobAppender;
 import org.apache.doris.statistics.query.QueryStats;
 import org.apache.doris.system.Backend;
 import org.apache.doris.system.Frontend;
@@ -540,6 +541,7 @@ public class Env {
     private final LoadManagerAdapter loadManagerAdapter;
 
     private StatisticsAutoCollector statisticsAutoCollector;
+    private StatisticsJobAppender statisticsJobAppender;
 
     private HiveTransactionMgr hiveTransactionMgr;
 
@@ -780,6 +782,7 @@ public class Env {
         this.analysisManager = new AnalysisManager();
         this.statisticsCleaner = new StatisticsCleaner();
         this.statisticsAutoCollector = new StatisticsAutoCollector();
+        this.statisticsJobAppender = new 
StatisticsJobAppender("StatisticsJobAppender");
         this.globalFunctionMgr = new GlobalFunctionMgr();
         this.workloadGroupMgr = new WorkloadGroupMgr();
         this.workloadSchedPolicyMgr = new WorkloadSchedPolicyMgr();
@@ -1078,12 +1081,6 @@ public class Env {
             // If not using bdb, we need to notify the FE type transfer 
manually.
             notifyNewFETypeTransfer(FrontendNodeType.MASTER);
         }
-        if (statisticsCleaner != null) {
-            statisticsCleaner.start();
-        }
-        if (statisticsAutoCollector != null) {
-            statisticsAutoCollector.start();
-        }
 
         queryCancelWorker.start();
     }
@@ -1623,6 +1620,15 @@ public class Env {
             if (analysisManager != null) {
                 analysisManager.getStatisticsCache().preHeat();
             }
+            if (statisticsCleaner != null) {
+                statisticsCleaner.start();
+            }
+            if (statisticsAutoCollector != null) {
+                statisticsAutoCollector.start();
+            }
+            if (statisticsJobAppender != null) {
+                statisticsJobAppender.start();
+            }
         } catch (Throwable e) {
             // When failed to transfer to master, we need to exit the process.
             // Otherwise, the process will be in an unknown state.
@@ -6327,6 +6333,10 @@ public class Env {
         return statisticsAutoCollector;
     }
 
+    public StatisticsJobAppender getStatisticsJobAppender() {
+        return statisticsJobAppender;
+    }
+
     public NereidsSqlCacheManager getSqlCacheManager() {
         return sqlCacheManager;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java
index 1d7d87d27f8..716f2bdca44 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java
@@ -350,7 +350,7 @@ public class ExternalTable implements TableIf, Writable, 
GsonPostProcessable {
             return true;
         }
         return System.currentTimeMillis()
-            - tblStats.updatedTime > 
StatisticsUtil.getExternalTableAutoAnalyzeIntervalInMillis();
+            - tblStats.lastAnalyzeTime > 
StatisticsUtil.getExternalTableAutoAnalyzeIntervalInMillis();
     }
 
     @Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
index 741f20dd88f..fc92efd7f1c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
@@ -2799,6 +2799,7 @@ public class ShowExecutor {
                                 java.time.ZoneId.systemDefault());
                 row.add(startTime.format(formatter));
                 row.add(endTime.format(formatter));
+                row.add(analysisInfo.priority == null ? "N/A" : 
analysisInfo.priority.name());
                 resultRows.add(row);
             } catch (Exception e) {
                 LOG.warn("Failed to get analyze info for table {}.{}.{}, 
reason: {}",
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 0b8d8c229cd..f314f3aa76c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -2513,7 +2513,7 @@ public class StmtExecutor {
         context.getState().setOk();
     }
 
-    private void handleAnalyzeStmt() throws DdlException, AnalysisException {
+    private void handleAnalyzeStmt() throws DdlException, AnalysisException, 
ExecutionException, InterruptedException {
         context.env.getAnalysisManager().createAnalyze((AnalyzeStmt) 
parsedStmt, isProxy);
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java
index 125b23bce7b..463b53bf645 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java
@@ -195,6 +195,13 @@ public class AnalysisInfo implements Writable {
 
     @SerializedName("rowCount")
     public final long rowCount;
+
+    @SerializedName("priority")
+    public final JobPriority priority;
+
+    @SerializedName("tv")
+    public final long tableVersion;
+
     /**
      *
      * Used to store the newest partition version of tbl when creating this 
job.
@@ -214,7 +221,7 @@ public class AnalysisInfo implements Writable {
             boolean isExternalTableLevelTask, boolean partitionOnly, boolean 
samplingPartition,
             boolean isAllPartition, long partitionCount, CronExpression 
cronExpression, boolean forceFull,
             boolean usingSqlForPartitionColumn, long tblUpdateTime, boolean 
emptyJob, boolean userInject,
-            long rowCount) {
+            long rowCount, JobPriority priority, long tableVersion) {
         this.jobId = jobId;
         this.taskId = taskId;
         this.taskIds = taskIds;
@@ -253,6 +260,8 @@ public class AnalysisInfo implements Writable {
         this.emptyJob = emptyJob;
         this.userInject = userInject;
         this.rowCount = rowCount;
+        this.priority = priority;
+        this.tableVersion = tableVersion;
     }
 
     @Override
@@ -295,6 +304,8 @@ public class AnalysisInfo implements Writable {
         sj.add("forceFull: " + forceFull);
         sj.add("usingSqlForPartitionColumn: " + usingSqlForPartitionColumn);
         sj.add("emptyJob: " + emptyJob);
+        sj.add("priority: " + priority.name());
+        sj.add("tableVersion: " + tableVersion);
         return sj.toString();
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfoBuilder.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfoBuilder.java
index 2e7c4078ca1..2dd79030220 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfoBuilder.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfoBuilder.java
@@ -65,6 +65,8 @@ public class AnalysisInfoBuilder {
     private boolean emptyJob;
     private boolean userInject = false;
     private long rowCount;
+    private JobPriority priority;
+    private long tableVersion;
 
     public AnalysisInfoBuilder() {
     }
@@ -105,6 +107,8 @@ public class AnalysisInfoBuilder {
         emptyJob = info.emptyJob;
         userInject = info.userInject;
         rowCount = info.rowCount;
+        priority = info.priority;
+        tableVersion = info.tableVersion;
     }
 
     public AnalysisInfoBuilder setJobId(long jobId) {
@@ -282,12 +286,23 @@ public class AnalysisInfoBuilder {
         return this;
     }
 
+    public AnalysisInfoBuilder setPriority(JobPriority priority) {
+        this.priority = priority;
+        return this;
+    }
+
+    public AnalysisInfoBuilder setTableVersion(long tableVersion) {
+        this.tableVersion = tableVersion;
+        return this;
+    }
+
     public AnalysisInfo build() {
         return new AnalysisInfo(jobId, taskId, taskIds, catalogId, dbId, 
tblId, jobColumns, partitionNames,
                 colName, indexId, jobType, analysisMode, analysisMethod, 
analysisType, samplePercent,
                 sampleRows, maxBucketNum, periodTimeInMs, message, 
lastExecTimeInMs, timeCostInMs, state, scheduleType,
                 externalTableLevelTask, partitionOnly, samplingPartition, 
isAllPartition, partitionCount,
-                cronExpression, forceFull, usingSqlForPartitionColumn, 
tblUpdateTime, emptyJob, userInject, rowCount);
+                cronExpression, forceFull, usingSqlForPartitionColumn, 
tblUpdateTime, emptyJob, userInject, rowCount,
+                priority, tableVersion);
     }
 
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index 5f6206e854e..0e4a1c7b42d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -99,6 +99,7 @@ import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -144,7 +145,8 @@ public class AnalysisManager implements Writable {
         return statisticsCache;
     }
 
-    public void createAnalyze(AnalyzeStmt analyzeStmt, boolean proxy) throws 
DdlException, AnalysisException {
+    public void createAnalyze(AnalyzeStmt analyzeStmt, boolean proxy)
+            throws DdlException, AnalysisException, ExecutionException, 
InterruptedException {
         if (!StatisticsUtil.statsTblAvailable() && 
!FeConstants.runningUnitTest) {
             throw new DdlException("Stats table not available, please make 
sure your cluster status is normal");
         }
@@ -157,10 +159,8 @@ public class AnalysisManager implements Writable {
 
     public void createAnalysisJobs(AnalyzeDBStmt analyzeDBStmt, boolean proxy) 
throws DdlException, AnalysisException {
         DatabaseIf<TableIf> db = analyzeDBStmt.getDb();
-        // Using auto analyzer if user specifies.
         if 
(analyzeDBStmt.getAnalyzeProperties().getProperties().containsKey("use.auto.analyzer"))
 {
-            Env.getCurrentEnv().getStatisticsAutoCollector().analyzeDb(db);
-            return;
+            throw new DdlException("Analyze database doesn't support 
use.auto.analyzer property.");
         }
         List<AnalysisInfo> analysisInfos = buildAnalysisInfosForDB(db, 
analyzeDBStmt.getAnalyzeProperties());
         if (!analyzeDBStmt.isSync()) {
@@ -208,22 +208,12 @@ public class AnalysisManager implements Writable {
     }
 
     // Each analyze stmt corresponding to an analysis job.
-    public void createAnalysisJob(AnalyzeTblStmt stmt, boolean proxy) throws 
DdlException, AnalysisException {
+    public void createAnalysisJob(AnalyzeTblStmt stmt, boolean proxy)
+            throws DdlException, AnalysisException, ExecutionException, 
InterruptedException {
         // Using auto analyzer if user specifies.
         if 
(stmt.getAnalyzeProperties().getProperties().containsKey("use.auto.analyzer")) {
             StatisticsAutoCollector autoCollector = 
Env.getCurrentEnv().getStatisticsAutoCollector();
-            if (autoCollector.skip(stmt.getTable())) {
-                return;
-            }
-            List<AnalysisInfo> jobs = new ArrayList<>();
-            autoCollector.createAnalyzeJobForTbl(stmt.getDb(), jobs, 
stmt.getTable());
-            if (jobs.isEmpty()) {
-                return;
-            }
-            AnalysisInfo job = 
autoCollector.getNeedAnalyzeColumns(jobs.get(0));
-            if (job != null) {
-                
Env.getCurrentEnv().getStatisticsAutoCollector().createSystemAnalysisJob(job);
-            }
+            autoCollector.processOneJob(stmt.getTable(), 
JobPriority.MANUAL_AUTO);
             return;
         }
         AnalysisInfo jobInfo = buildAndAssignJob(stmt);
@@ -347,7 +337,6 @@ public class AnalysisManager implements Writable {
         infoBuilder.setAnalysisMode(analysisMode);
         infoBuilder.setAnalysisMethod(analysisMethod);
         infoBuilder.setScheduleType(scheduleType);
-        infoBuilder.setLastExecTimeInMs(0);
         infoBuilder.setCronExpression(cronExpression);
         infoBuilder.setForceFull(stmt.forceFull());
         
infoBuilder.setUsingSqlForPartitionColumn(stmt.usingSqlForPartitionColumn());
@@ -377,6 +366,8 @@ public class AnalysisManager implements Writable {
                 && analysisMethod.equals(AnalysisMethod.SAMPLE));
         long rowCount = StatisticsUtil.isEmptyTable(table, analysisMethod) ? 0 
: table.getRowCount();
         infoBuilder.setRowCount(rowCount);
+        infoBuilder.setPriority(JobPriority.MANUAL);
+        infoBuilder.setTableVersion(table instanceof OlapTable ? ((OlapTable) 
table).getVisibleVersion() : 0);
         return infoBuilder.build();
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
index 5277d8025fc..3db9a862d10 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
@@ -27,6 +27,7 @@ import org.apache.logging.log4j.Logger;
 
 import java.util.Comparator;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -88,9 +89,9 @@ public class AnalysisTaskExecutor {
         }
     }
 
-    public void submitTask(BaseAnalysisTask task) {
+    public Future<?> submitTask(BaseAnalysisTask task) {
         AnalysisTaskWrapper taskWrapper = new AnalysisTaskWrapper(this, task);
-        executors.submit(taskWrapper);
+        return executors.submit(taskWrapper);
     }
 
     public void putJob(AnalysisTaskWrapper wrapper) throws Exception {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColStatsMeta.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColStatsMeta.java
index 445641b2505..f9da52b01cc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColStatsMeta.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColStatsMeta.java
@@ -43,13 +43,17 @@ public class ColStatsMeta {
     @SerializedName("trigger")
     public JobType jobType;
 
+    @SerializedName("tv")
+    public long tableVersion;
+
     public ColStatsMeta(long updatedTime, AnalysisMethod analysisMethod,
-            AnalysisType analysisType, JobType jobType, long queriedTimes) {
+            AnalysisType analysisType, JobType jobType, long queriedTimes, 
long tableVersion) {
         this.updatedTime = updatedTime;
         this.analysisMethod = analysisMethod;
         this.analysisType = analysisType;
         this.jobType = jobType;
         this.queriedTimes.addAndGet(queriedTimes);
+        this.tableVersion = tableVersion;
     }
 
     public void clear() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/JobPriority.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/JobPriority.java
new file mode 100644
index 00000000000..2d45dad877b
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/JobPriority.java
@@ -0,0 +1,25 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.statistics;
+
+public enum JobPriority {
+    HIGH,
+    LOW,
+    MANUAL,
+    MANUAL_AUTO;
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
index c1a8af93ac0..1d7818a8e8d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
@@ -18,172 +18,125 @@
 package org.apache.doris.statistics;
 
 import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.DatabaseIf;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.Pair;
-import org.apache.doris.common.util.TimeUtils;
-import org.apache.doris.datasource.CatalogIf;
-import org.apache.doris.datasource.hive.HMSExternalTable;
+import org.apache.doris.common.util.MasterDaemon;
 import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod;
 import org.apache.doris.statistics.AnalysisInfo.JobType;
 import org.apache.doris.statistics.AnalysisInfo.ScheduleType;
 import org.apache.doris.statistics.util.StatisticsUtil;
 
-import org.apache.hudi.common.util.VisibleForTesting;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-import java.time.LocalTime;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.StringJoiner;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
-public class StatisticsAutoCollector extends StatisticsCollector {
+public class StatisticsAutoCollector extends MasterDaemon {
 
     private static final Logger LOG = 
LogManager.getLogger(StatisticsAutoCollector.class);
 
+    public static final int JOB_QUEUE_LIMIT = 100;
+    private final BlockingQueue<TableIf> highPriorityJobs = new 
ArrayBlockingQueue<>(JOB_QUEUE_LIMIT);
+    private final BlockingQueue<TableIf> lowPriorityJobs = new 
ArrayBlockingQueue<>(JOB_QUEUE_LIMIT);
+
+    protected final AnalysisTaskExecutor analysisTaskExecutor;
+
     public StatisticsAutoCollector() {
-        super("Automatic Analyzer",
-                
TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes),
-                new 
AnalysisTaskExecutor(Config.auto_analyze_simultaneously_running_task_num,
-                        StatisticConstants.TASK_QUEUE_CAP));
+        super("Automatic Analyzer", TimeUnit.SECONDS.toMillis(10));
+        this.analysisTaskExecutor = new 
AnalysisTaskExecutor(Config.auto_analyze_simultaneously_running_task_num,
+                StatisticConstants.TASK_QUEUE_CAP);
     }
 
     @Override
-    protected void collect() {
-        if (canCollect()) {
-            analyzeAll();
+    protected void runAfterCatalogReady() {
+        if (!Env.getCurrentEnv().isMaster()) {
+            return;
+        }
+        if (!StatisticsUtil.statsTblAvailable()) {
+            LOG.info("Stats table not available, skip");
+            return;
+        }
+        if (Env.isCheckpointThread()) {
+            return;
+        }
+        try {
+            collect();
+        } catch (DdlException | ExecutionException | InterruptedException e) {
+            LOG.warn("One auto analyze job failed. ", e);
         }
     }
 
-    protected boolean canCollect() {
-        return StatisticsUtil.enableAutoAnalyze()
-            && 
StatisticsUtil.inAnalyzeTime(LocalTime.now(TimeUtils.getTimeZone().toZoneId()));
-    }
-
-    protected void analyzeAll() {
-        List<CatalogIf> catalogs = getCatalogsInOrder();
-        for (CatalogIf ctl : catalogs) {
-            if (!canCollect()) {
-                analysisTaskExecutor.clear();
-                break;
-            }
-            if (!ctl.enableAutoAnalyze()) {
-                continue;
+    protected void collect() throws DdlException, ExecutionException, 
InterruptedException {
+        if (!StatisticsUtil.canCollect()) {
+            return;
+        }
+        AnalysisManager analysisManager = 
Env.getCurrentEnv().getAnalysisManager();
+        while (true) {
+            Pair<TableIf, JobPriority> jobPair = fetchOneJob();
+            TableIf table = jobPair.first;
+            if (table == null) {
+                return;
             }
-            List<DatabaseIf> dbs = getDatabasesInOrder(ctl);
-            for (DatabaseIf<TableIf> databaseIf : dbs) {
-                if (!canCollect()) {
-                    analysisTaskExecutor.clear();
-                    break;
-                }
-                if 
(StatisticConstants.SYSTEM_DBS.contains(databaseIf.getFullName())) {
-                    continue;
-                }
-                try {
-                    analyzeDb(databaseIf);
-                } catch (Throwable t) {
-                    LOG.warn("Failed to analyze database {}.{}", 
ctl.getName(), databaseIf.getFullName(), t);
-                    continue;
-                }
+            TableStatsMeta tblStats = 
analysisManager.findTableStatsStatus(table.getId());
+            if (table.needReAnalyzeTable(tblStats) || 
StatisticsUtil.tableNotAnalyzedForTooLong(table, tblStats)) {
+                processOneJob(table, jobPair.second);
             }
         }
     }
 
-    public List<CatalogIf> getCatalogsInOrder() {
-        return Env.getCurrentEnv().getCatalogMgr().getCopyOfCatalog().stream()
-            .sorted((c1, c2) -> (int) (c1.getId() - 
c2.getId())).collect(Collectors.toList());
-    }
-
-    public List<DatabaseIf<? extends TableIf>> 
getDatabasesInOrder(CatalogIf<DatabaseIf> catalog) {
-        return catalog.getAllDbs().stream()
-            .sorted((d1, d2) -> (int) (d1.getId() - 
d2.getId())).collect(Collectors.toList());
-    }
-
-    public List<TableIf> getTablesInOrder(DatabaseIf<? extends TableIf> db) {
-        return db.getTables().stream()
-            .sorted((t1, t2) -> (int) (t1.getId() - 
t2.getId())).collect(Collectors.toList());
-    }
-
-    public void analyzeDb(DatabaseIf<TableIf> databaseIf) throws DdlException {
-        List<AnalysisInfo> analysisInfos = constructAnalysisInfo(databaseIf);
-        for (AnalysisInfo analysisInfo : analysisInfos) {
-            try {
-                if (!canCollect()) {
-                    analysisTaskExecutor.clear();
-                    break;
-                }
-                analysisInfo = getNeedAnalyzeColumns(analysisInfo);
-                if (analysisInfo == null) {
-                    continue;
-                }
-                createSystemAnalysisJob(analysisInfo);
-            } catch (Throwable t) {
-                analysisInfo.message = t.getMessage();
-                LOG.warn("Failed to auto analyze table {}.{}, reason {}",
-                        databaseIf.getFullName(), analysisInfo.tblId, 
analysisInfo.message, t);
-                continue;
-            }
+    protected Pair<TableIf, JobPriority> fetchOneJob() {
+        TableIf table = null;
+        JobPriority priority = null;
+        try {
+            table = highPriorityJobs.poll(1, TimeUnit.SECONDS);
+            priority = JobPriority.HIGH;
+        } catch (InterruptedException e) {
+            LOG.debug(e);
         }
-    }
-
-    protected List<AnalysisInfo> constructAnalysisInfo(DatabaseIf<? extends 
TableIf> db) {
-        List<AnalysisInfo> analysisInfos = new ArrayList<>();
-        for (TableIf table : getTablesInOrder(db)) {
+        if (table == null) {
             try {
-                if (skip(table)) {
-                    continue;
-                }
-                createAnalyzeJobForTbl(db, analysisInfos, table);
-            } catch (Throwable t) {
-                LOG.warn("Failed to analyze table {}.{}.{}",
-                        db.getCatalog().getName(), db.getFullName(), 
table.getName(), t);
+                table = lowPriorityJobs.poll(1, TimeUnit.SECONDS);
+                priority = JobPriority.LOW;
+            } catch (InterruptedException e) {
+                LOG.debug(e);
             }
         }
-        return analysisInfos;
-    }
-
-    // return true if skip auto analyze this time.
-    protected boolean skip(TableIf table) {
-        if (!(table instanceof OlapTable || table instanceof 
HMSExternalTable)) {
-            return true;
-        }
-        // For now, only support Hive HMS table auto collection.
-        if (table instanceof HMSExternalTable
-                && !((HMSExternalTable) 
table).getDlaType().equals(HMSExternalTable.DLAType.HIVE)) {
-            return true;
-        }
-        if (table.getDataSize(true) < 
StatisticsUtil.getHugeTableLowerBoundSizeInBytes() * 5) {
-            return false;
-        }
-        TableStatsMeta tableStats = 
Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(table.getId());
-        // means it's never got analyzed or new partition loaded data.
-        if (tableStats == null || tableStats.newPartitionLoaded.get()) {
-            return false;
+        if (table == null) {
+            LOG.debug("Job queues are all empty.");
         }
-        if (tableStats.userInjected) {
-            return true;
-        }
-        return System.currentTimeMillis()
-                - tableStats.updatedTime < 
StatisticsUtil.getHugeTableAutoAnalyzeIntervalInMillis();
+        return Pair.of(table, priority);
     }
 
-    protected void createAnalyzeJobForTbl(DatabaseIf<? extends TableIf> db,
-            List<AnalysisInfo> analysisInfos, TableIf table) {
-        AnalysisMethod analysisMethod = table.getDataSize(true) >= 
StatisticsUtil.getHugeTableLowerBoundSizeInBytes()
-                ? AnalysisMethod.SAMPLE : AnalysisMethod.FULL;
-        if (table instanceof OlapTable && 
analysisMethod.equals(AnalysisMethod.SAMPLE)) {
-            OlapTable ot = (OlapTable) table;
-            if (ot.getRowCountForIndex(ot.getBaseIndexId(), true) == 
TableIf.UNKNOWN_ROW_COUNT) {
-                LOG.info("Table {} row count is not fully reported, skip auto 
analyzing this time.", ot.getName());
-                return;
-            }
+    protected void processOneJob(TableIf table, JobPriority priority)
+                throws DdlException, ExecutionException, InterruptedException {
+        List<Pair<String, String>> needRunColumns = table.getColumnIndexPairs(
+                table.getSchemaAllIndexes(false)
+                        .stream()
+                        .filter(c -> 
!StatisticsUtil.isUnsupportedType(c.getType()))
+                        .map(Column::getName)
+                        .collect(Collectors.toSet()));
+        if (needRunColumns == null || needRunColumns.isEmpty()) {
+            return;
+        }
+        AnalysisMethod analysisMethod =
+                table.getDataSize(true) >= 
StatisticsUtil.getHugeTableLowerBoundSizeInBytes()
+                        ? AnalysisMethod.SAMPLE : AnalysisMethod.FULL;
+        if (!tableRowCountReported(table, analysisMethod)) {
+            return;
         }
         // We don't auto analyze empty table to avoid all 0 stats.
         // Because all 0 is more dangerous than unknown stats when row count 
report is delayed.
@@ -198,12 +151,33 @@ public class StatisticsAutoCollector extends 
StatisticsCollector {
             }
             return;
         }
+        StringJoiner stringJoiner = new StringJoiner(",", "[", "]");
+        for (Pair<String, String> pair : needRunColumns) {
+            stringJoiner.add(pair.toString());
+        }
+        AnalysisInfo jobInfo = createAnalysisInfo(table, analysisMethod, 
rowCount,
+                    stringJoiner.toString(), needRunColumns, priority);
+        executeSystemAnalysisJob(jobInfo);
+    }
+
+    protected boolean tableRowCountReported(TableIf table, AnalysisMethod 
analysisMethod) {
+        if (table instanceof OlapTable && 
analysisMethod.equals(AnalysisMethod.SAMPLE)) {
+            OlapTable ot = (OlapTable) table;
+            if (ot.getRowCountForIndex(ot.getBaseIndexId(), true) == 
TableIf.UNKNOWN_ROW_COUNT) {
+                LOG.info("Table {} row count is not fully reported, skip auto 
analyzing this time.", ot.getName());
+                return false;
+            }
+        }
+        return true;
+    }
+
+    protected AnalysisInfo createAnalysisInfo(TableIf table, AnalysisMethod 
analysisMethod, long rowCount,
+            String colNames, List<Pair<String, String>> needRunColumns, 
JobPriority priority) {
         AnalysisInfo jobInfo = new AnalysisInfoBuilder()
                 .setJobId(Env.getCurrentEnv().getNextId())
-                .setCatalogId(db.getCatalog().getId())
-                .setDBId(db.getId())
+                .setCatalogId(table.getDatabase().getCatalog().getId())
+                .setDBId(table.getDatabase().getId())
                 .setTblId(table.getId())
-                .setColName(null)
                 .setAnalysisType(AnalysisInfo.AnalysisType.FUNDAMENTALS)
                 .setAnalysisMode(AnalysisInfo.AnalysisMode.INCREMENTAL)
                 .setAnalysisMethod(analysisMethod)
@@ -216,37 +190,46 @@ public class StatisticsAutoCollector extends 
StatisticsCollector {
                 .setJobType(JobType.SYSTEM)
                 .setTblUpdateTime(table.getUpdateTime())
                 .setEmptyJob(table instanceof OlapTable && table.getRowCount() 
== 0
-                    && analysisMethod.equals(AnalysisMethod.SAMPLE))
+                        && analysisMethod.equals(AnalysisMethod.SAMPLE))
                 .setRowCount(rowCount)
+                .setColName(colNames)
+                .setJobColumns(needRunColumns)
+                .setPriority(priority)
+                .setTableVersion(table instanceof OlapTable ? ((OlapTable) 
table).getVisibleVersion() : 0)
                 .build();
-        analysisInfos.add(jobInfo);
+        return jobInfo;
     }
 
-    @VisibleForTesting
-    protected AnalysisInfo getNeedAnalyzeColumns(AnalysisInfo jobInfo) {
-        TableIf table = StatisticsUtil.findTable(jobInfo.catalogId, 
jobInfo.dbId, jobInfo.tblId);
-        // Skip tables that are too wide.
-        if (table.getBaseSchema().size() > 
StatisticsUtil.getAutoAnalyzeTableWidthThreshold()) {
-            return null;
+    // Analysis job created by the system
+    protected void executeSystemAnalysisJob(AnalysisInfo jobInfo)
+            throws DdlException, ExecutionException, InterruptedException {
+        if (jobInfo.jobColumns.isEmpty()) {
+            // No statistics need to be collected or updated
+            return;
         }
-
-        AnalysisManager analysisManager = 
Env.getServingEnv().getAnalysisManager();
-        TableStatsMeta tblStats = 
analysisManager.findTableStatsStatus(table.getId());
-
-        List<Pair<String, String>> needRunColumns = null;
-        if (table.needReAnalyzeTable(tblStats)) {
-            needRunColumns = 
table.getColumnIndexPairs(table.getSchemaAllIndexes(false)
-                .stream().map(Column::getName).collect(Collectors.toSet()));
+        Map<Long, BaseAnalysisTask> analysisTasks = new HashMap<>();
+        AnalysisManager analysisManager = 
Env.getCurrentEnv().getAnalysisManager();
+        analysisManager.createTaskForEachColumns(jobInfo, analysisTasks, 
false);
+        if (StatisticsUtil.isExternalTable(jobInfo.catalogId, jobInfo.dbId, 
jobInfo.tblId)) {
+            analysisManager.createTableLevelTaskForExternalTable(jobInfo, 
analysisTasks, false);
         }
-
-        if (needRunColumns == null || needRunColumns.isEmpty()) {
-            return null;
+        Env.getCurrentEnv().getAnalysisManager().constructJob(jobInfo, 
analysisTasks.values());
+        Env.getCurrentEnv().getAnalysisManager().registerSysJob(jobInfo, 
analysisTasks);
+        Future<?>[] futures = new Future[analysisTasks.values().size()];
+        int i = 0;
+        for (BaseAnalysisTask task : analysisTasks.values()) {
+            futures[i++] = analysisTaskExecutor.submitTask(task);
         }
-        StringJoiner stringJoiner = new StringJoiner(",", "[", "]");
-        for (Pair<String, String> pair : needRunColumns) {
-            stringJoiner.add(pair.toString());
+        for (Future future : futures) {
+            future.get();
         }
-        return new AnalysisInfoBuilder(jobInfo)
-            
.setColName(stringJoiner.toString()).setJobColumns(needRunColumns).build();
+    }
+
+    public void appendToHighPriorityJobs(TableIf table) throws 
InterruptedException {
+        highPriorityJobs.put(table);
+    }
+
+    public boolean appendToLowPriorityJobs(TableIf table) {
+        return lowPriorityJobs.offer(table);
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java
deleted file mode 100644
index ec187fe893a..00000000000
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java
+++ /dev/null
@@ -1,79 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.statistics;
-
-import org.apache.doris.catalog.Env;
-import org.apache.doris.common.DdlException;
-import org.apache.doris.common.util.MasterDaemon;
-import org.apache.doris.statistics.util.StatisticsUtil;
-
-import org.apache.hudi.common.util.VisibleForTesting;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public abstract class StatisticsCollector extends MasterDaemon {
-
-    private static final Logger LOG = 
LogManager.getLogger(StatisticsCollector.class);
-
-    protected final AnalysisTaskExecutor analysisTaskExecutor;
-
-    public StatisticsCollector(String name, long intervalMs, 
AnalysisTaskExecutor analysisTaskExecutor) {
-        super(name, intervalMs);
-        this.analysisTaskExecutor = analysisTaskExecutor;
-    }
-
-    @Override
-    protected void runAfterCatalogReady() {
-        if (!Env.getCurrentEnv().isMaster()) {
-            return;
-        }
-        if (!StatisticsUtil.statsTblAvailable()) {
-            LOG.info("Stats table not available, skip");
-            return;
-        }
-        if (Env.isCheckpointThread()) {
-            return;
-        }
-        collect();
-    }
-
-    protected abstract void collect();
-
-    // Analysis job created by the system
-    @VisibleForTesting
-    protected void createSystemAnalysisJob(AnalysisInfo jobInfo)
-            throws DdlException {
-        if (jobInfo.jobColumns.isEmpty()) {
-            // No statistics need to be collected or updated
-            return;
-        }
-        Map<Long, BaseAnalysisTask> analysisTasks = new HashMap<>();
-        AnalysisManager analysisManager = 
Env.getCurrentEnv().getAnalysisManager();
-        analysisManager.createTaskForEachColumns(jobInfo, analysisTasks, 
false);
-        if (StatisticsUtil.isExternalTable(jobInfo.catalogId, jobInfo.dbId, 
jobInfo.tblId)) {
-            analysisManager.createTableLevelTaskForExternalTable(jobInfo, 
analysisTasks, false);
-        }
-        Env.getCurrentEnv().getAnalysisManager().constructJob(jobInfo, 
analysisTasks.values());
-        Env.getCurrentEnv().getAnalysisManager().registerSysJob(jobInfo, 
analysisTasks);
-        analysisTasks.values().forEach(analysisTaskExecutor::submitTask);
-    }
-
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java
 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java
new file mode 100644
index 00000000000..3703cf236c0
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.statistics;
+
+import org.apache.doris.catalog.DatabaseIf;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.util.MasterDaemon;
+import org.apache.doris.datasource.CatalogIf;
+import org.apache.doris.datasource.hive.HMSExternalTable;
+import org.apache.doris.statistics.util.StatisticsUtil;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public class StatisticsJobAppender extends MasterDaemon {
+    private static final Logger LOG = 
LogManager.getLogger(StatisticsJobAppender.class);
+
+    public StatisticsJobAppender(String name) {
+        super(name, 
TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes));
+    }
+
+    @Override
+    protected void runAfterCatalogReady() {
+        if (!Env.getCurrentEnv().isMaster()) {
+            return;
+        }
+        if (!StatisticsUtil.statsTblAvailable()) {
+            LOG.info("Stats table not available, skip");
+            return;
+        }
+        if (Env.getCurrentEnv().getStatisticsAutoCollector() == null) {
+            LOG.info("Statistics auto collector not ready, skip");
+            return;
+        }
+        if (Env.isCheckpointThread()) {
+            return;
+        }
+        if (!Env.getCurrentEnv().isReady()) {
+            return;
+        }
+        if (!StatisticsUtil.canCollect()) {
+            LOG.debug("Auto analyze not enabled or not in analyze time 
range.");
+            return;
+        }
+        traverseAllTables();
+    }
+
+    protected void traverseAllTables() {
+        List<CatalogIf> catalogs = getCatalogsInOrder();
+        AnalysisManager analysisManager = 
Env.getServingEnv().getAnalysisManager();
+        StatisticsAutoCollector autoCollector = 
Env.getCurrentEnv().getStatisticsAutoCollector();
+        for (CatalogIf ctl : catalogs) {
+            if (!StatisticsUtil.canCollect()) {
+                break;
+            }
+            if (!ctl.enableAutoAnalyze()) {
+                continue;
+            }
+            List<DatabaseIf> dbs = getDatabasesInOrder(ctl);
+            for (DatabaseIf<TableIf> db : dbs) {
+                if (!StatisticsUtil.canCollect()) {
+                    break;
+                }
+                if (StatisticConstants.SYSTEM_DBS.contains(db.getFullName())) {
+                    continue;
+                }
+                for (TableIf table : getTablesInOrder(db)) {
+                    try {
+                        if (skip(table)) {
+                            continue;
+                        }
+                        TableStatsMeta tblStats = 
analysisManager.findTableStatsStatus(table.getId());
+                        if (table.needReAnalyzeTable(tblStats)) {
+                            autoCollector.appendToHighPriorityJobs(table);
+                        } else if 
(StatisticsUtil.tableNotAnalyzedForTooLong(table, tblStats)) {
+                            autoCollector.appendToLowPriorityJobs(table);
+                        }
+                    } catch (Throwable t) {
+                        LOG.warn("Failed to analyze table {}.{}.{}",
+                                ctl.getName(), db.getFullName(), 
table.getName(), t);
+                    }
+                }
+            }
+        }
+    }
+
+    public List<CatalogIf> getCatalogsInOrder() {
+        return Env.getCurrentEnv().getCatalogMgr().getCopyOfCatalog().stream()
+                .sorted((c1, c2) -> (int) (c1.getId() - 
c2.getId())).collect(Collectors.toList());
+    }
+
+    public List<DatabaseIf<? extends TableIf>> 
getDatabasesInOrder(CatalogIf<DatabaseIf> catalog) {
+        return catalog.getAllDbs().stream()
+                .sorted((d1, d2) -> (int) (d1.getId() - 
d2.getId())).collect(Collectors.toList());
+    }
+
+    public List<TableIf> getTablesInOrder(DatabaseIf<? extends TableIf> db) {
+        return db.getTables().stream()
+                .sorted((t1, t2) -> (int) (t1.getId() - 
t2.getId())).collect(Collectors.toList());
+    }
+
+    // return true if skip auto analyze this time.
+    protected boolean skip(TableIf table) {
+        if (!(table instanceof OlapTable || table instanceof 
HMSExternalTable)) {
+            return true;
+        }
+        // For now, only support Hive HMS table auto collection.
+        if (table instanceof HMSExternalTable
+                && !((HMSExternalTable) 
table).getDlaType().equals(HMSExternalTable.DLAType.HIVE)) {
+            return true;
+        }
+        // Skip wide table.
+        if (table.getBaseSchema().size() > 
StatisticsUtil.getAutoAnalyzeTableWidthThreshold()) {
+            return true;
+        }
+        if (table.getDataSize(true) < 
StatisticsUtil.getHugeTableLowerBoundSizeInBytes() * 5) {
+            return false;
+        }
+        TableStatsMeta tableStats = 
Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(table.getId());
+        // means it's never got analyzed or new partition loaded data.
+        if (tableStats == null || tableStats.newPartitionLoaded.get()) {
+            return false;
+        }
+        if (tableStats.userInjected) {
+            return true;
+        }
+        return System.currentTimeMillis()
+                - tableStats.lastAnalyzeTime < 
StatisticsUtil.getHugeTableAutoAnalyzeIntervalInMillis();
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java
index 50739e98aea..33f099e2b0b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java
@@ -79,6 +79,9 @@ public class TableStatsMeta implements Writable, 
GsonPostProcessable {
     @SerializedName("updateTime")
     public long updatedTime;
 
+    @SerializedName("lat")
+    public long lastAnalyzeTime;
+
     @SerializedName("colNameToColStatsMeta")
     private ConcurrentMap<String, ColStatsMeta> 
deprecatedColNameToColStatsMeta = new ConcurrentHashMap<>();
 
@@ -156,19 +159,21 @@ public class TableStatsMeta implements Writable, 
GsonPostProcessable {
 
     public void update(AnalysisInfo analyzedJob, TableIf tableIf) {
         updatedTime = analyzedJob.tblUpdateTime;
+        lastAnalyzeTime = analyzedJob.createTime;
         if (analyzedJob.userInject) {
             userInjected = true;
         }
         for (Pair<String, String> colPair : analyzedJob.jobColumns) {
             ColStatsMeta colStatsMeta = colToColStatsMeta.get(colPair);
             if (colStatsMeta == null) {
-                colToColStatsMeta.put(colPair, new ColStatsMeta(updatedTime,
-                        analyzedJob.analysisMethod, analyzedJob.analysisType, 
analyzedJob.jobType, 0));
+                colToColStatsMeta.put(colPair, new 
ColStatsMeta(lastAnalyzeTime, analyzedJob.analysisMethod,
+                        analyzedJob.analysisType, analyzedJob.jobType, 0, 
analyzedJob.tableVersion));
             } else {
-                colStatsMeta.updatedTime = updatedTime;
+                colStatsMeta.updatedTime = lastAnalyzeTime;
                 colStatsMeta.analysisType = analyzedJob.analysisType;
                 colStatsMeta.analysisMethod = analyzedJob.analysisMethod;
                 colStatsMeta.jobType = analyzedJob.jobType;
+                colStatsMeta.tableVersion = analyzedJob.tableVersion;
             }
         }
         jobType = analyzedJob.jobType;
@@ -233,4 +238,9 @@ public class TableStatsMeta implements Writable, 
GsonPostProcessable {
     public boolean isColumnsStatsEmpty() {
         return colToColStatsMeta == null || colToColStatsMeta.isEmpty();
     }
+
+    @VisibleForTesting
+    public void setColToColStatsMeta(ConcurrentMap<Pair<String, String>, 
ColStatsMeta> colToColStatsMeta) {
+        this.colToColStatsMeta = colToColStatsMeta;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
index 288eb88e95f..718260a25b0 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
@@ -51,6 +51,7 @@ import org.apache.doris.common.Config;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.datasource.CatalogIf;
 import org.apache.doris.datasource.ExternalTable;
 import org.apache.doris.datasource.InternalCatalog;
@@ -70,6 +71,7 @@ import org.apache.doris.statistics.ColumnStatisticBuilder;
 import org.apache.doris.statistics.Histogram;
 import org.apache.doris.statistics.ResultRow;
 import org.apache.doris.statistics.StatisticConstants;
+import org.apache.doris.statistics.TableStatsMeta;
 import org.apache.doris.system.Frontend;
 
 import com.google.common.base.Preconditions;
@@ -904,4 +906,37 @@ public class StatisticsUtil {
         return rowCount == 0;
     }
 
+    public static boolean canCollect() {
+        return enableAutoAnalyze() && 
inAnalyzeTime(LocalTime.now(TimeUtils.getTimeZone().toZoneId()));
+    }
+
+    public static boolean tableNotAnalyzedForTooLong(TableIf table, 
TableStatsMeta tblStats) {
+        if (table == null || tblStats == null) {
+            LOG.warn("Table or stats is null.");
+            return false;
+        }
+        if (tblStats.userInjected) {
+            return false;
+        }
+        if (!(table instanceof OlapTable)) {
+            return false;
+        }
+        boolean isLongTime = Config.auto_analyze_interval_seconds > 0
+                && System.currentTimeMillis() - tblStats.lastAnalyzeTime > 
Config.auto_analyze_interval_seconds * 1000;
+        if (!isLongTime) {
+            return false;
+        }
+        // For OlapTable, if update rows is 0, row count doesn't change since 
last analyze
+        // and table visible version doesn't change since last analyze. Then 
we skip analyzing it.
+        if (tblStats.updatedRows.get() != 0) {
+            return true;
+        }
+        long rowCount = table.getRowCount();
+        if (rowCount != tblStats.rowCount) {
+            return true;
+        }
+        long visibleVersion = ((OlapTable) table).getVisibleVersion();
+        return tblStats.analyzeColumns().stream()
+                .anyMatch(c -> tblStats.findColumnStatsMeta(c.first, 
c.second).tableVersion != visibleVersion);
+    }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java
index 4f40071f501..d687d111d2c 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java
@@ -17,504 +17,102 @@
 
 package org.apache.doris.statistics;
 
-import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.Database;
-import org.apache.doris.catalog.DatabaseIf;
-import org.apache.doris.catalog.Env;
-import org.apache.doris.catalog.EnvFactory;
 import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.catalog.PrimitiveType;
-import org.apache.doris.catalog.Table;
 import org.apache.doris.catalog.TableIf;
-import org.apache.doris.catalog.Type;
-import org.apache.doris.catalog.View;
-import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
-import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.Pair;
-import org.apache.doris.datasource.CatalogIf;
-import org.apache.doris.datasource.InternalCatalog;
-import org.apache.doris.statistics.util.StatisticsUtil;
+import org.apache.doris.datasource.ExternalTable;
+import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod;
 
-import com.google.common.collect.Lists;
-import mockit.Expectations;
-import mockit.Injectable;
 import mockit.Mock;
 import mockit.MockUp;
-import mockit.Mocked;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
-import java.time.LocalTime;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.ExecutionException;
 
 public class StatisticsAutoCollectorTest {
 
     @Test
-    public void testAnalyzeAll(@Injectable AnalysisInfo analysisInfo) {
-        new MockUp<CatalogIf>() {
-            @Mock
-            public Collection<DatabaseIf> getAllDbs() {
-                Database db1 = new Database(1, FeConstants.INTERNAL_DB_NAME);
-                Database db2 = new Database(2, "anyDB");
-                List<DatabaseIf> databaseIfs = new ArrayList<>();
-                databaseIfs.add(db1);
-                databaseIfs.add(db2);
-                return databaseIfs;
-            }
-        };
+    public void testCollect() throws DdlException, ExecutionException, 
InterruptedException {
+        StatisticsAutoCollector collector = new StatisticsAutoCollector();
+        final int[] count = {0, 0};
         new MockUp<StatisticsAutoCollector>() {
             @Mock
-            public List<AnalysisInfo> 
constructAnalysisInfo(DatabaseIf<TableIf> db) {
-                return Arrays.asList(analysisInfo, analysisInfo);
-            }
-
-            int count = 0;
-
-            @Mock
-            public AnalysisInfo getReAnalyzeRequiredPart(AnalysisInfo jobInfo) 
{
-                return count++ == 0 ? null : jobInfo;
-            }
-
-            @Mock
-            public void createSystemAnalysisJob(AnalysisInfo jobInfo)
-                    throws DdlException {
-
-            }
-        };
-
-        StatisticsAutoCollector saa = new StatisticsAutoCollector();
-        saa.runAfterCatalogReady();
-        new Expectations() {
-            {
-                try {
-                    saa.createSystemAnalysisJob((AnalysisInfo) any);
-                    times = 1;
-                } catch (Exception e) {
-                    throw new RuntimeException(e);
-                }
-            }
-        };
-    }
-
-    @Test
-    public void testConstructAnalysisInfo(
-            @Injectable OlapTable o2, @Injectable View v) {
-        new MockUp<Database>() {
-            @Mock
-            public List<Table> getTables() {
-                List<Table> tableIfs = new ArrayList<>();
-                tableIfs.add(o2);
-                tableIfs.add(v);
-                return tableIfs;
-            }
-
-            @Mock
-            public String getFullName() {
-                return "anyDb";
-            }
-        };
-
-        new MockUp<OlapTable>() {
-            @Mock
-            public String getName() {
-                return "anytable";
-            }
-
-            @Mock
-            public List<Column> getSchemaAllIndexes(boolean full) {
-                List<Column> columns = new ArrayList<>();
-                columns.add(new Column("c1", PrimitiveType.INT));
-                columns.add(new Column("c2", PrimitiveType.HLL));
-                return columns;
-            }
-
-            @Mock
-            public long getRowCount() {
-                return 1;
-            }
-        };
-        StatisticsAutoCollector saa = new StatisticsAutoCollector();
-        List<AnalysisInfo> analysisInfoList = saa.constructAnalysisInfo(new 
Database(1, "anydb"));
-        Assertions.assertEquals(1, analysisInfoList.size());
-        Assertions.assertNull(analysisInfoList.get(0).colName);
-    }
-
-    @Test
-    public void testSkipWideTable() {
-
-        TableIf tableIf = new OlapTable();
-
-        new MockUp<OlapTable>() {
-            @Mock
-            public List<Column> getBaseSchema() {
-                return Lists.newArrayList(new Column("col1", Type.INT), new 
Column("col2", Type.INT));
-            }
-
-            @Mock
-            public List<Pair<String, String>> getColumnIndexPairs(Set<String> 
columns) {
-                ArrayList<Pair<String, String>> list = Lists.newArrayList();
-                list.add(Pair.of("1", "1"));
-                return list;
+            protected Pair<TableIf, JobPriority> fetchOneJob() {
+                count[0]++;
+                return Pair.of(null, JobPriority.LOW);
             }
         };
 
-        new MockUp<StatisticsUtil>() {
-            int count = 0;
-            int[] thresholds = {1, 10};
-
-            @Mock
-            public TableIf findTable(long catalogName, long dbName, long 
tblName) {
-                return tableIf;
-            }
-
-            @Mock
-            public int getAutoAnalyzeTableWidthThreshold() {
-                return thresholds[count++];
-            }
-        };
-
-        AnalysisInfo analysisInfo = new AnalysisInfoBuilder().build();
-        StatisticsAutoCollector statisticsAutoCollector = new 
StatisticsAutoCollector();
-        
Assertions.assertNull(statisticsAutoCollector.getNeedAnalyzeColumns(analysisInfo));
-        
Assertions.assertNotNull(statisticsAutoCollector.getNeedAnalyzeColumns(analysisInfo));
-    }
-
-    @Test
-    public void testLoop() {
-        AtomicBoolean timeChecked = new AtomicBoolean();
-        AtomicBoolean switchChecked = new AtomicBoolean();
-        new MockUp<StatisticsUtil>() {
-
-            @Mock
-            public boolean inAnalyzeTime(LocalTime now) {
-                timeChecked.set(true);
-                return true;
-            }
-
-            @Mock
-            public boolean enableAutoAnalyze() {
-                switchChecked.set(true);
-                return true;
-            }
-        };
-        StatisticsAutoCollector autoCollector = new StatisticsAutoCollector();
-        autoCollector.collect();
-        Assertions.assertTrue(timeChecked.get() && switchChecked.get());
-
-    }
-
-    @Test
-    public void checkAvailableThread() {
-        StatisticsAutoCollector autoCollector = new StatisticsAutoCollector();
-        
Assertions.assertEquals(Config.auto_analyze_simultaneously_running_task_num,
-                
autoCollector.analysisTaskExecutor.executors.getMaximumPoolSize());
-    }
-
-    @Test
-    public void testSkip(@Mocked OlapTable olapTable, @Mocked TableStatsMeta 
stats, @Mocked TableIf anyOtherTable) {
-        new MockUp<OlapTable>() {
-
-            @Mock
-            public long getDataSize(boolean singleReplica) {
-                return StatisticsUtil.getHugeTableLowerBoundSizeInBytes() * 5 
+ 1000000000;
-            }
-        };
-
-        new MockUp<AnalysisManager>() {
-
-            @Mock
-            public TableStatsMeta findTableStatsStatus(long tblId) {
-                return stats;
-            }
-        };
-        // A very huge table has been updated recently, so we should skip it 
this time
-        stats.updatedTime = System.currentTimeMillis() - 1000;
-        stats.newPartitionLoaded = new AtomicBoolean();
-        stats.newPartitionLoaded.set(true);
-        StatisticsAutoCollector autoCollector = new StatisticsAutoCollector();
-        // Test new partition loaded data for the first time. Not skip.
-        Assertions.assertFalse(autoCollector.skip(olapTable));
-        stats.newPartitionLoaded.set(false);
-        // Assertions.assertTrue(autoCollector.skip(olapTable));
-        // The update of this huge table is long time ago, so we shouldn't 
skip it this time
-        stats.updatedTime = System.currentTimeMillis()
-                - StatisticsUtil.getHugeTableAutoAnalyzeIntervalInMillis() - 
10000;
-        Assertions.assertFalse(autoCollector.skip(olapTable));
         new MockUp<AnalysisManager>() {
-
             @Mock
             public TableStatsMeta findTableStatsStatus(long tblId) {
+                count[1]++;
                 return null;
             }
         };
-        // can't find table stats meta, which means this table never get 
analyzed,  so we shouldn't skip it this time
-        Assertions.assertFalse(autoCollector.skip(olapTable));
-        new MockUp<AnalysisManager>() {
-
-            @Mock
-            public TableStatsMeta findTableStatsStatus(long tblId) {
-                return stats;
-            }
-        };
-        stats.userInjected = true;
-        Assertions.assertTrue(autoCollector.skip(olapTable));
-        // this is not olap table nor external table, so we should skip it 
this time
-        Assertions.assertTrue(autoCollector.skip(anyOtherTable));
-    }
-
-    // For small table, use full
-    @Test
-    public void testCreateAnalyzeJobForTbl1(
-            @Injectable OlapTable t1,
-            @Injectable Database db
-    ) throws Exception {
-        new MockUp<Database>() {
-
-            @Mock
-            public CatalogIf getCatalog() {
-                return Env.getCurrentInternalCatalog();
-            }
-
-            @Mock
-            public long getId() {
-                return 0;
-            }
-        };
-        new MockUp<OlapTable>() {
-
-            int count = 0;
-
-            @Mock
-            public List<Column> getBaseSchema() {
-                return Lists.newArrayList(new Column("test", 
PrimitiveType.INT));
-            }
-
-            @Mock
-            public long getDataSize(boolean singleReplica) {
-                return StatisticsUtil.getHugeTableLowerBoundSizeInBytes() - 1;
-            }
-
-            @Mock
-            public BaseAnalysisTask createAnalysisTask(AnalysisInfo info) {
-                return new OlapAnalysisTask(info);
-            }
-
-            @Mock
-            public List<Long> getMvColumnIndexIds(String columnName) {
-                ArrayList<Long> objects = new ArrayList<>();
-                objects.add(-1L);
-                return objects;
-            }
-
-            @Mock
-            public long getRowCount() {
-                return 1;
-            }
-        };
-
-        new MockUp<StatisticsUtil>() {
-            @Mock
-            public TableIf findTable(long catalogId, long dbId, long tblId) {
-                return t1;
-            }
-        };
-
-        StatisticsAutoCollector sac = new StatisticsAutoCollector();
-        List<AnalysisInfo> jobInfos = new ArrayList<>();
-        sac.createAnalyzeJobForTbl(db, jobInfos, t1);
-        AnalysisInfo jobInfo = jobInfos.get(0);
-        List<Pair<String, String>> columnNames = Lists.newArrayList();
-        columnNames.add(Pair.of("test", "t1"));
-        jobInfo = new 
AnalysisInfoBuilder(jobInfo).setJobColumns(columnNames).build();
-        Map<Long, BaseAnalysisTask> analysisTasks = new HashMap<>();
-        AnalysisManager analysisManager = 
Env.getCurrentEnv().getAnalysisManager();
-        analysisManager.createTaskForEachColumns(jobInfo, analysisTasks, 
false);
-        Assertions.assertEquals(1, analysisTasks.size());
-        for (BaseAnalysisTask task : analysisTasks.values()) {
-            Assertions.assertNull(task.getTableSample());
-        }
-    }
-
-    // for big table, use sample
-    @Test
-    public void testCreateAnalyzeJobForTbl2(
-            @Injectable OlapTable t1,
-            @Injectable Database db
-    ) throws Exception {
-        new MockUp<Database>() {
-
-            @Mock
-            public CatalogIf getCatalog() {
-                return Env.getCurrentInternalCatalog();
-            }
-
-            @Mock
-            public long getId() {
-                return 0;
-            }
-        };
-        new MockUp<OlapTable>() {
-
-            int count = 0;
-
-            @Mock
-            public List<Column> getBaseSchema() {
-                return Lists.newArrayList(new Column("test", 
PrimitiveType.INT));
-            }
-
-            @Mock
-            public long getDataSize(boolean singleReplica) {
-                return StatisticsUtil.getHugeTableLowerBoundSizeInBytes() * 2;
-            }
-
-            @Mock
-            public BaseAnalysisTask createAnalysisTask(AnalysisInfo info) {
-                return new OlapAnalysisTask(info);
-            }
-
-            @Mock
-            public List<Long> getMvColumnIndexIds(String columnName) {
-                ArrayList<Long> objects = new ArrayList<>();
-                objects.add(-1L);
-                return objects;
-            }
-
-            @Mock
-            public long getRowCount() {
-                return 1;
-            }
-        };
-
-        new MockUp<StatisticsUtil>() {
-            @Mock
-            public TableIf findTable(long catalogId, long dbId, long tblId) {
-                return t1;
-            }
-        };
-
-        StatisticsAutoCollector sac = new StatisticsAutoCollector();
-        List<AnalysisInfo> jobInfos = new ArrayList<>();
-        sac.createAnalyzeJobForTbl(db, jobInfos, t1);
-        AnalysisInfo jobInfo = jobInfos.get(0);
-        List<Pair<String, String>> colNames = Lists.newArrayList();
-        colNames.add(Pair.of("test", "1"));
-        jobInfo = new 
AnalysisInfoBuilder(jobInfo).setJobColumns(colNames).build();
-        Map<Long, BaseAnalysisTask> analysisTasks = new HashMap<>();
-        AnalysisManager analysisManager = 
Env.getCurrentEnv().getAnalysisManager();
-        analysisManager.createTaskForEachColumns(jobInfo, analysisTasks, 
false);
-        Assertions.assertEquals(1, analysisTasks.size());
-        for (BaseAnalysisTask task : analysisTasks.values()) {
-            Assertions.assertNotNull(task.getTableSample());
-        }
-    }
-
-    @Test
-    public void testDisableAuto1() throws Exception {
-        InternalCatalog catalog1 = EnvFactory.createInternalCatalog();
-        List<CatalogIf> catalogs = Lists.newArrayList();
-        catalogs.add(catalog1);
+        collector.collect();
+        Assertions.assertEquals(1, count[0]);
+        Assertions.assertEquals(0, count[1]);
 
+        OlapTable table = new OlapTable();
         new MockUp<StatisticsAutoCollector>() {
             @Mock
-            public List<CatalogIf> getCatalogsInOrder() {
-                return catalogs;
-            }
-
-            @Mock
-            protected boolean canCollect() {
-                return false;
+            protected Pair<TableIf, JobPriority> fetchOneJob() {
+                if (count[0] == 0) {
+                    count[0]++;
+                    return Pair.of(table, JobPriority.LOW);
+                }
+                count[0]++;
+                return Pair.of(null, JobPriority.LOW);
             }
-
         };
-
-        StatisticsAutoCollector sac = new StatisticsAutoCollector();
-        new Expectations(catalog1) {{
-                catalog1.enableAutoAnalyze();
-                times = 0;
-            }};
-
-        sac.analyzeAll();
+        count[0] = 0;
+        count[1] = 0;
+        collector.collect();
+        Assertions.assertEquals(2, count[0]);
+        Assertions.assertEquals(1, count[1]);
     }
 
     @Test
-    public void testDisableAuto2() throws Exception {
-        InternalCatalog catalog1 = EnvFactory.createInternalCatalog();
-        List<CatalogIf> catalogs = Lists.newArrayList();
-        catalogs.add(catalog1);
-
-        Database db1 = new Database();
-        List<DatabaseIf<? extends TableIf>> dbs = Lists.newArrayList();
-        dbs.add(db1);
-
-        new MockUp<StatisticsAutoCollector>() {
-            int count = 0;
-            boolean[] canCollectReturn = {true, false};
-            @Mock
-            public List<CatalogIf> getCatalogsInOrder() {
-                return catalogs;
-            }
-
-            @Mock
-            public List<DatabaseIf<? extends TableIf>> 
getDatabasesInOrder(CatalogIf<DatabaseIf> catalog) {
-                return dbs;
-            }
-
-                @Mock
-            protected boolean canCollect() {
-                return canCollectReturn[count++];
-            }
-
-        };
-
-        StatisticsAutoCollector sac = new StatisticsAutoCollector();
-        new Expectations(catalog1, db1) {{
-                catalog1.enableAutoAnalyze();
-                result = true;
-                times = 1;
-                db1.getFullName();
-                times = 0;
-            }};
-
-        sac.analyzeAll();
+    public void testFetchOneJob() throws InterruptedException {
+        OlapTable table1 = new OlapTable();
+        OlapTable table2 = new OlapTable();
+        StatisticsAutoCollector collector = new StatisticsAutoCollector();
+        collector.appendToHighPriorityJobs(table1);
+        collector.appendToLowPriorityJobs(table2);
+        Pair<TableIf, JobPriority> jobPair = collector.fetchOneJob();
+        Assertions.assertSame(table1, jobPair.first);
+        Assertions.assertEquals(JobPriority.HIGH, jobPair.second);
+        jobPair = collector.fetchOneJob();
+        Assertions.assertSame(table2, jobPair.first);
+        Assertions.assertEquals(JobPriority.LOW, jobPair.second);
+        jobPair = collector.fetchOneJob();
+        Assertions.assertNull(jobPair.first);
     }
 
     @Test
-    public void testCreateAnalyzeJobForTbl() {
+    public void testTableRowCountReported() {
         StatisticsAutoCollector collector = new StatisticsAutoCollector();
-        OlapTable table = new OlapTable();
+        ExternalTable externalTable = new ExternalTable();
+        Assertions.assertTrue(collector.tableRowCountReported(externalTable, 
AnalysisMethod.SAMPLE));
+        OlapTable olapTable = new OlapTable();
+        Assertions.assertTrue(collector.tableRowCountReported(olapTable, 
AnalysisMethod.FULL));
+        Assertions.assertTrue(collector.tableRowCountReported(externalTable, 
AnalysisMethod.FULL));
         new MockUp<OlapTable>() {
-            @Mock
-            public long getDataSize(boolean singleReplica) {
-                return 100;
-            }
-
             @Mock
             public long getRowCountForIndex(long indexId, boolean strict) {
-                return -1;
-            }
-
-            @Mock
-            public boolean isPartitionedTable() {
-                return false;
+                return TableIf.UNKNOWN_ROW_COUNT;
             }
         };
-        List<AnalysisInfo> infos = Lists.newArrayList();
-        collector.createAnalyzeJobForTbl(null, infos, table);
-        Assertions.assertEquals(0, infos.size());
+        Assertions.assertFalse(collector.tableRowCountReported(olapTable, 
AnalysisMethod.SAMPLE));
         new MockUp<OlapTable>() {
             @Mock
             public long getRowCountForIndex(long indexId, boolean strict) {
-                return 100;
+                return TableIf.UNKNOWN_ROW_COUNT + 1;
             }
         };
-        Assertions.assertThrows(NullPointerException.class, () -> 
collector.createAnalyzeJobForTbl(null, infos, table));
+        Assertions.assertTrue(collector.tableRowCountReported(olapTable, 
AnalysisMethod.SAMPLE));
     }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsJobAppenderTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsJobAppenderTest.java
new file mode 100644
index 00000000000..f57791a1a5c
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsJobAppenderTest.java
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.statistics;
+
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.statistics.util.StatisticsUtil;
+
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.Mocked;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class StatisticsJobAppenderTest {
+    @Test
+    public void testSkip(@Mocked OlapTable olapTable, @Mocked TableStatsMeta 
stats, @Mocked TableIf anyOtherTable) {
+        new MockUp<OlapTable>() {
+
+            @Mock
+            public long getDataSize(boolean singleReplica) {
+                return StatisticsUtil.getHugeTableLowerBoundSizeInBytes() * 5 
+ 1000000000;
+            }
+        };
+
+        new MockUp<AnalysisManager>() {
+
+            @Mock
+            public TableStatsMeta findTableStatsStatus(long tblId) {
+                return stats;
+            }
+        };
+        // A very huge table has been updated recently, so we should skip it 
this time
+        stats.updatedTime = System.currentTimeMillis() - 1000;
+        stats.newPartitionLoaded = new AtomicBoolean();
+        stats.newPartitionLoaded.set(true);
+        StatisticsJobAppender appender = new StatisticsJobAppender("appender");
+        // Test new partition loaded data for the first time. Not skip.
+        Assertions.assertFalse(appender.skip(olapTable));
+        stats.newPartitionLoaded.set(false);
+        // The update of this huge table is long time ago, so we shouldn't 
skip it this time
+        stats.updatedTime = System.currentTimeMillis()
+                - StatisticsUtil.getHugeTableAutoAnalyzeIntervalInMillis() - 
10000;
+        Assertions.assertFalse(appender.skip(olapTable));
+        new MockUp<AnalysisManager>() {
+
+            @Mock
+            public TableStatsMeta findTableStatsStatus(long tblId) {
+                return null;
+            }
+        };
+        // can't find table stats meta, which means this table never get 
analyzed,  so we shouldn't skip it this time
+        Assertions.assertFalse(appender.skip(olapTable));
+        new MockUp<AnalysisManager>() {
+
+            @Mock
+            public TableStatsMeta findTableStatsStatus(long tblId) {
+                return stats;
+            }
+        };
+        stats.userInjected = true;
+        Assertions.assertTrue(appender.skip(olapTable));
+
+        // this is not olap table nor external table, so we should skip it 
this time
+        Assertions.assertTrue(appender.skip(anyOtherTable));
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/statistics/util/StatisticsUtilTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/statistics/util/StatisticsUtilTest.java
index 724e0363833..0e221673e9d 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/statistics/util/StatisticsUtilTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/statistics/util/StatisticsUtilTest.java
@@ -17,10 +17,20 @@
 
 package org.apache.doris.statistics.util;
 
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Table;
 import org.apache.doris.catalog.Type;
 import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.Pair;
+import org.apache.doris.datasource.ExternalTable;
 import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod;
+import org.apache.doris.statistics.AnalysisInfo.AnalysisType;
+import org.apache.doris.statistics.AnalysisInfo.JobType;
+import org.apache.doris.statistics.ColStatsMeta;
 import org.apache.doris.statistics.ResultRow;
+import org.apache.doris.statistics.TableStatsMeta;
 
 import com.google.common.collect.Lists;
 import mockit.Mock;
@@ -33,6 +43,8 @@ import java.time.LocalTime;
 import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.Base64;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 class StatisticsUtilTest {
     @Test
@@ -150,4 +162,77 @@ class StatisticsUtilTest {
         // \\''""
         Assertions.assertEquals("\\\\''\"", StatisticsUtil.escapeSQL(origin));
     }
+
+    @Test
+    public void testTableNotAnalyzedForTooLong() throws InterruptedException {
+        TableStatsMeta tableMeta = new TableStatsMeta();
+        OlapTable olapTable = new OlapTable();
+        ExternalTable externalTable = new ExternalTable();
+
+        // Test table or stats is null
+        Assertions.assertFalse(StatisticsUtil.tableNotAnalyzedForTooLong(null, 
tableMeta));
+        
Assertions.assertFalse(StatisticsUtil.tableNotAnalyzedForTooLong(olapTable, 
null));
+
+        // Test user injected
+        tableMeta.userInjected = true;
+        
Assertions.assertFalse(StatisticsUtil.tableNotAnalyzedForTooLong(olapTable, 
tableMeta));
+
+        // Test External table
+        tableMeta.userInjected = false;
+        
Assertions.assertFalse(StatisticsUtil.tableNotAnalyzedForTooLong(externalTable, 
tableMeta));
+
+        // Test config is 0
+        Config.auto_analyze_interval_seconds = 0;
+        
Assertions.assertFalse(StatisticsUtil.tableNotAnalyzedForTooLong(olapTable, 
tableMeta));
+
+        // Test time not long enough
+        Config.auto_analyze_interval_seconds = 86400;
+        tableMeta.lastAnalyzeTime = System.currentTimeMillis();
+        
Assertions.assertFalse(StatisticsUtil.tableNotAnalyzedForTooLong(olapTable, 
tableMeta));
+
+        // Test time long enough and update rows > 0
+        Config.auto_analyze_interval_seconds = 1;
+        tableMeta.lastAnalyzeTime = System.currentTimeMillis();
+        Thread.sleep(2000);
+        tableMeta.updatedRows.set(10);
+        
Assertions.assertTrue(StatisticsUtil.tableNotAnalyzedForTooLong(olapTable, 
tableMeta));
+
+        // Test row count is not equal with last analyze
+        tableMeta.updatedRows.set(0);
+        tableMeta.rowCount = 10;
+        new MockUp<Table>() {
+            @Mock
+            public long getRowCount() {
+                return 100;
+            }
+        };
+        
Assertions.assertTrue(StatisticsUtil.tableNotAnalyzedForTooLong(olapTable, 
tableMeta));
+
+        // Test visible version changed
+        new MockUp<OlapTable>() {
+            @Mock
+            public long getVisibleVersion() {
+                return 100;
+            }
+        };
+        new MockUp<Table>() {
+            @Mock
+            public long getRowCount() {
+                return 10;
+            }
+        };
+        ConcurrentMap<Pair<String, String>, ColStatsMeta> colToColStatsMeta = 
new ConcurrentHashMap<>();
+        ColStatsMeta col1Meta = new ColStatsMeta(0, AnalysisMethod.SAMPLE, 
AnalysisType.FUNDAMENTALS, JobType.SYSTEM, 0, 100);
+        ColStatsMeta col2Meta = new ColStatsMeta(0, AnalysisMethod.SAMPLE, 
AnalysisType.FUNDAMENTALS, JobType.SYSTEM, 0, 101);
+        colToColStatsMeta.put(Pair.of("index1", "col1"), col1Meta);
+        colToColStatsMeta.put(Pair.of("index2", "col2"), col2Meta);
+        tableMeta.setColToColStatsMeta(colToColStatsMeta);
+        
Assertions.assertTrue(StatisticsUtil.tableNotAnalyzedForTooLong(olapTable, 
tableMeta));
+
+        // Test visible version unchanged.
+        col2Meta = new ColStatsMeta(0, AnalysisMethod.SAMPLE, 
AnalysisType.FUNDAMENTALS, JobType.SYSTEM, 0, 100);
+        colToColStatsMeta.put(Pair.of("index2", "col2"), col2Meta);
+        tableMeta.setColToColStatsMeta(colToColStatsMeta);
+        
Assertions.assertFalse(StatisticsUtil.tableNotAnalyzedForTooLong(olapTable, 
tableMeta));
+    }
 }
diff --git 
a/regression-test/suites/external_table_p0/hive/test_hive_statistic_auto.groovy 
b/regression-test/suites/external_table_p0/hive/test_hive_statistic_auto.groovy
index 8a34bf9204f..56f5a0bd338 100644
--- 
a/regression-test/suites/external_table_p0/hive/test_hive_statistic_auto.groovy
+++ 
b/regression-test/suites/external_table_p0/hive/test_hive_statistic_auto.groovy
@@ -36,13 +36,14 @@ suite("test_hive_statistic_auto", 
"p0,external,hive,external_docker,external_doc
         logger.info("catalog " + catalog_name + " created")
 
         // Test analyze table without init.
-        sql """analyze database ${catalog_name}.statistics 
PROPERTIES("use.auto.analyzer"="true")"""
+        sql """analyze table ${catalog_name}.statistics.statistics 
PROPERTIES("use.auto.analyzer"="true")"""
         sql """use ${catalog_name}.statistics"""
 
         for (int i = 0; i < 10; i++) {
-            Thread.sleep(1000)
+            Thread.sleep(2000)
             def result = sql """show column stats `statistics` (lo_quantity)"""
-            if (result.size <= 0) {
+            if (result.size() <= 0) {
+                sql """analyze table ${catalog_name}.statistics.statistics 
PROPERTIES("use.auto.analyzer"="true")"""
                 continue;
             }
             assertEquals(result.size(), 1)
@@ -56,7 +57,7 @@ suite("test_hive_statistic_auto", 
"p0,external,hive,external_docker,external_doc
             assertEquals(result[0][8], "N/A")
 
             result = sql """show column stats `statistics` (lo_orderkey)"""
-            if (result.size <= 0) {
+            if (result.size() <= 0) {
                 continue;
             }
             assertEquals(result.size(), 1)
@@ -70,7 +71,7 @@ suite("test_hive_statistic_auto", 
"p0,external,hive,external_docker,external_doc
             assertEquals(result[0][8], "N/A")
 
             result = sql """show column stats `statistics` (lo_linenumber)"""
-            if (result.size <= 0) {
+            if (result.size() <= 0) {
                 continue;
             }
             assertEquals(result.size(), 1)
@@ -82,6 +83,7 @@ suite("test_hive_statistic_auto", 
"p0,external,hive,external_docker,external_doc
             assertEquals(result[0][6], "4.0")
             assertEquals(result[0][7], "N/A")
             assertEquals(result[0][8], "N/A")
+            break
         }
 
         sql """drop catalog ${catalog_name}"""
diff --git a/regression-test/suites/statistics/analyze_stats.groovy 
b/regression-test/suites/statistics/analyze_stats.groovy
index b4edc5e9d7b..c518a15d7dd 100644
--- a/regression-test/suites/statistics/analyze_stats.groovy
+++ b/regression-test/suites/statistics/analyze_stats.groovy
@@ -2837,7 +2837,7 @@ PARTITION `p599` VALUES IN (599)
 
     // Test auto analyze with job type SYSTEM
     sql """drop stats trigger_test"""
-    sql """analyze database trigger PROPERTIES("use.auto.analyzer"="true")"""
+    sql """analyze table trigger_test PROPERTIES("use.auto.analyzer"="true")"""
     int i = 0;
     for (0; i < 10; i++) {
         result = sql """show column stats trigger_test"""
@@ -2941,7 +2941,38 @@ PARTITION `p599` VALUES IN (599)
     new_part_result = sql """show column stats part(colint)"""
     assertEquals("2.0", new_part_result[0][2])
 
-
     sql """DROP DATABASE IF EXISTS trigger"""
+
+    // Test show last analyze table version
+    sql """create database if not exists test_version"""
+    sql """use test_version"""
+    sql """drop table if exists region"""
+    sql """
+        CREATE TABLE region  (
+            r_regionkey      int NOT NULL,
+            r_name       VARCHAR(25) NOT NULL,
+            r_comment    VARCHAR(152)
+        )ENGINE=OLAP
+        DUPLICATE KEY(`r_regionkey`)
+        COMMENT "OLAP"
+        DISTRIBUTED BY HASH(`r_regionkey`) BUCKETS 1
+        PROPERTIES (
+            "replication_num" = "1"
+        );
+    """
+    sql """analyze table region with sync"""
+    def versionResult = sql """show column stats region"""
+    assertEquals(versionResult[0][14], "1")
+    assertEquals(versionResult[1][14], "1")
+    assertEquals(versionResult[2][14], "1")
+
+    sql """insert into region values (1, "1", "1")"""
+    sql """analyze table region with sync"""
+    versionResult = sql """show column stats region"""
+    assertEquals(versionResult[0][14], "2")
+    assertEquals(versionResult[1][14], "2")
+    assertEquals(versionResult[2][14], "2")
+
+    sql """drop database if exists test_version"""
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to