This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 72b709d6a9 [opt](stats) split period collector from auto collector 
(#23622)
72b709d6a9 is described below

commit 72b709d6a9ee2a6637c78a1a9a7f0c3b49b69a2b
Author: AKIRA <33112463+kikyou1...@users.noreply.github.com>
AuthorDate: Mon Sep 4 17:04:16 2023 +0800

    [opt](stats) split period collector from auto collector (#23622)
    
    1. Split period analyze from auto collector
    2. Analyze table incrementally by default
    3. Rename StatisticsAutoAnalyzer to StatisticsAutoCollector
---
 .../main/java/org/apache/doris/common/Config.java  |  3 +
 .../main/java/org/apache/doris/catalog/Env.java    | 21 +++---
 .../java/org/apache/doris/catalog/OlapTable.java   |  3 +-
 .../main/java/org/apache/doris/catalog/Table.java  |  2 +-
 .../java/org/apache/doris/catalog/TableIf.java     |  2 +-
 .../doris/catalog/external/ExternalTable.java      |  2 +-
 .../org/apache/doris/statistics/AnalysisInfo.java  |  1 +
 .../apache/doris/statistics/AnalysisManager.java   | 46 +++++++-----
 .../doris/statistics/AnalysisTaskExecutor.java     | 20 ++++--
 .../apache/doris/statistics/OlapAnalysisTask.java  |  6 +-
 ...oAnalyzer.java => StatisticsAutoCollector.java} | 69 +++---------------
 .../doris/statistics/StatisticsCollector.java      | 81 ++++++++++++++++++++++
 .../statistics/StatisticsPeriodCollector.java      | 50 +++++++++++++
 .../org/apache/doris/statistics/TableStats.java    |  2 +-
 ...rTest.java => StatisticsAutoCollectorTest.java} | 20 +++---
 .../suites/statistics/analyze_stats.groovy         |  8 +++
 16 files changed, 229 insertions(+), 107 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index d13fc72e7e..2c6087cd28 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2103,6 +2103,9 @@ public class Config extends ConfigBase {
     @ConfField
     public static int full_auto_analyze_simultaneously_running_task_num = 1;
 
+    @ConfField
+    public static final int period_analyze_simultaneously_running_task_num = 1;
+
     @ConfField
     public static int cpu_resource_limit_per_analyze_task = 1;
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 71d1ef1d8d..8f9ca541a8 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -224,9 +224,10 @@ import 
org.apache.doris.scheduler.registry.TimerJobRegister;
 import org.apache.doris.service.ExecuteEnv;
 import org.apache.doris.service.FrontendOptions;
 import org.apache.doris.statistics.AnalysisManager;
-import org.apache.doris.statistics.StatisticsAutoAnalyzer;
+import org.apache.doris.statistics.StatisticsAutoCollector;
 import org.apache.doris.statistics.StatisticsCache;
 import org.apache.doris.statistics.StatisticsCleaner;
+import org.apache.doris.statistics.StatisticsPeriodCollector;
 import org.apache.doris.statistics.query.QueryStats;
 import org.apache.doris.system.Backend;
 import org.apache.doris.system.Frontend;
@@ -476,7 +477,9 @@ public class Env {
      */
     private final LoadManagerAdapter loadManagerAdapter;
 
-    private StatisticsAutoAnalyzer statisticsAutoAnalyzer;
+    private StatisticsAutoCollector statisticsAutoCollector;
+
+    private StatisticsPeriodCollector statisticsPeriodCollector;
 
     private HiveTransactionMgr hiveTransactionMgr;
 
@@ -701,7 +704,8 @@ public class Env {
         this.extMetaCacheMgr = new ExternalMetaCacheMgr();
         this.analysisManager = new AnalysisManager();
         this.statisticsCleaner = new StatisticsCleaner();
-        this.statisticsAutoAnalyzer = new StatisticsAutoAnalyzer();
+        this.statisticsAutoCollector = new StatisticsAutoCollector();
+        this.statisticsPeriodCollector = new StatisticsPeriodCollector();
         this.globalFunctionMgr = new GlobalFunctionMgr();
         this.workloadGroupMgr = new WorkloadGroupMgr();
         this.queryStats = new QueryStats();
@@ -948,8 +952,11 @@ public class Env {
         if (statisticsCleaner != null) {
             statisticsCleaner.start();
         }
-        if (statisticsAutoAnalyzer != null) {
-            statisticsAutoAnalyzer.start();
+        if (statisticsAutoCollector != null) {
+            statisticsAutoCollector.start();
+        }
+        if (statisticsPeriodCollector != null) {
+            statisticsPeriodCollector.start();
         }
     }
 
@@ -5578,10 +5585,6 @@ public class Env {
         return loadManagerAdapter;
     }
 
-    public StatisticsAutoAnalyzer getStatisticsAutoAnalyzer() {
-        return statisticsAutoAnalyzer;
-    }
-
     public QueryStats getQueryStats() {
         return queryStats;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 39b5450e23..6fc6bb2ed5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -1137,7 +1137,8 @@ public class OlapTable extends Table {
     }
 
     @Override
-    public Set<String> findReAnalyzeNeededPartitions(TableStats tableStats) {
+    public Set<String> findReAnalyzeNeededPartitions() {
+        TableStats tableStats = 
Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(getId());
         if (tableStats == null) {
             return getPartitionNames().stream().map(this::getPartition)
                 
.filter(Partition::hasData).map(Partition::getName).collect(Collectors.toSet());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java
index 12689894b4..ef71e394e5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java
@@ -580,7 +580,7 @@ public abstract class Table extends MetaObject implements 
Writable, TableIf {
     }
 
     @Override
-    public Set<String> findReAnalyzeNeededPartitions(TableStats tableStats) {
+    public Set<String> findReAnalyzeNeededPartitions() {
         return Collections.emptySet();
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
index 21e2ddd154..ae67d0c9e1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
@@ -139,7 +139,7 @@ public interface TableIf {
 
     boolean needReAnalyzeTable(TableStats tblStats);
 
-    Set<String> findReAnalyzeNeededPartitions(TableStats tableStats);
+    Set<String> findReAnalyzeNeededPartitions();
 
     void write(DataOutput out) throws IOException;
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java
index 6f31ac18d7..ca5b80bc43 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java
@@ -388,7 +388,7 @@ public class ExternalTable implements TableIf, Writable, 
GsonPostProcessable {
     }
 
     @Override
-    public Set<String> findReAnalyzeNeededPartitions(TableStats tableStats) {
+    public Set<String> findReAnalyzeNeededPartitions() {
         HashSet<String> partitions = Sets.newHashSet();
         // TODO: Find a way to collect external table partitions that need to 
be analyzed.
         partitions.add("Dummy Partition");
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java
index 8f33480640..c20bad6396 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java
@@ -96,6 +96,7 @@ public class AnalysisInfo implements Writable {
     @SerializedName("tblName")
     public final String tblName;
 
+    // TODO: Map here is wired, List is enough
     @SerializedName("colToPartitions")
     public final Map<String, Set<String>> colToPartitions;
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index bdd325e6d1..853e9b3393 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -29,6 +29,7 @@ import org.apache.doris.analysis.TableName;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.DatabaseIf;
 import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.Partition;
 import org.apache.doris.catalog.ScalarType;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.catalog.View;
@@ -363,6 +364,9 @@ public class AnalysisManager extends Daemon implements 
Writable {
         ShowResultSetMetaData commonResultSetMetaData = new 
ShowResultSetMetaData(columns);
         List<List<String>> resultRows = new ArrayList<>();
         for (AnalysisInfo analysisInfo : analysisInfos) {
+            if (analysisInfo == null) {
+                continue;
+            }
             List<String> row = new ArrayList<>();
             row.add(analysisInfo.catalogName);
             row.add(analysisInfo.dbName);
@@ -442,23 +446,9 @@ public class AnalysisManager extends Daemon implements 
Writable {
             StatisticsRepository.dropStatistics(invalidPartIds);
         }
 
-        if (analysisMode == AnalysisMode.INCREMENTAL && analysisType == 
AnalysisType.FUNDAMENTALS) {
-            existColAndPartsForStats.values().forEach(partIds -> 
partIds.removeAll(invalidPartIds));
-            // In incremental collection mode, just collect the uncollected 
partition statistics
-            existColAndPartsForStats.forEach((columnName, partitionIds) -> {
-                Set<String> existPartitions = partitionIds.stream()
-                        .map(idToPartition::get)
-                        .collect(Collectors.toSet());
-                columnToPartitions.computeIfPresent(columnName, (colName, 
partNames) -> {
-                    partNames.removeAll(existPartitions);
-                    return partNames;
-                });
-            });
-            if (invalidPartIds.isEmpty()) {
-                // There is no invalid statistics, so there is no need to 
update table statistics,
-                // remove columns that do not require re-collection of 
statistics
-                columnToPartitions.entrySet().removeIf(entry -> 
entry.getValue().isEmpty());
-            }
+        if (analysisType == AnalysisType.FUNDAMENTALS) {
+            Set<String> reAnalyzeNeededPartitions = 
findReAnalyzeNeededPartitions(table);
+            columnToPartitions.replaceAll((k, v) -> reAnalyzeNeededPartitions);
         }
 
         return columnToPartitions;
@@ -692,6 +682,12 @@ public class AnalysisManager extends Daemon implements 
Writable {
         }
         Set<String> cols = dropStatsStmt.getColumnNames();
         long tblId = dropStatsStmt.getTblId();
+        TableStats tableStats = findTableStatsStatus(dropStatsStmt.getTblId());
+        if (tableStats == null) {
+            return;
+        }
+        tableStats.updatedTime = 0;
+        replayUpdateTableStatsStatus(tableStats);
         StatisticsRepository.dropStatistics(tblId, cols);
         for (String col : cols) {
             Env.getCurrentEnv().getStatisticsCache().invalidate(tblId, -1L, 
col);
@@ -951,4 +947,20 @@ public class AnalysisManager extends Daemon implements 
Writable {
         systemJobInfoMap.put(jobInfo.jobId, jobInfo);
         analysisJobIdToTaskMap.put(jobInfo.jobId, taskInfos);
     }
+
+    @VisibleForTesting
+    protected Set<String> findReAnalyzeNeededPartitions(TableIf table) {
+        TableStats tableStats = findTableStatsStatus(table.getId());
+        if (tableStats == null) {
+            return table.getPartitionNames().stream().map(table::getPartition)
+                    
.filter(Partition::hasData).map(Partition::getName).collect(Collectors.toSet());
+        }
+        return table.getPartitionNames().stream()
+                .map(table::getPartition)
+                .filter(Partition::hasData)
+                .filter(partition ->
+                        partition.getVisibleVersionTime() >= 
tableStats.updatedTime).map(Partition::getName)
+                .collect(Collectors.toSet());
+    }
+
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
index fb23050fff..a7b0073bb4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.statistics;
 
+import org.apache.doris.catalog.Env;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.ThreadPoolManager;
 import org.apache.doris.common.ThreadPoolManager.BlockedPolicy;
@@ -42,16 +43,23 @@ public class AnalysisTaskExecutor extends Thread {
                     
Comparator.comparingLong(AnalysisTaskWrapper::getStartTime));
 
     public AnalysisTaskExecutor(int simultaneouslyRunningTaskNum) {
-        executors = ThreadPoolManager.newDaemonThreadPool(
-                simultaneouslyRunningTaskNum,
-                simultaneouslyRunningTaskNum, 0,
-                TimeUnit.DAYS, new LinkedBlockingQueue<>(),
-                new BlockedPolicy("Analysis Job Executor", Integer.MAX_VALUE),
-                "Analysis Job Executor", true);
+        if (!Env.isCheckpointThread()) {
+            executors = ThreadPoolManager.newDaemonThreadPool(
+                    simultaneouslyRunningTaskNum,
+                    simultaneouslyRunningTaskNum, 0,
+                    TimeUnit.DAYS, new LinkedBlockingQueue<>(),
+                    new BlockedPolicy("Analysis Job Executor", 
Integer.MAX_VALUE),
+                    "Analysis Job Executor", true);
+        } else {
+            executors = null;
+        }
     }
 
     @Override
     public void run() {
+        if (Env.isCheckpointThread()) {
+            return;
+        }
         cancelExpiredTask();
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java
index ef26d7349e..9d34b6aabd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java
@@ -74,6 +74,10 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
     }
 
     public void doExecute() throws Exception {
+        Set<String> partitionNames = info.colToPartitions.get(info.colName);
+        if (partitionNames.isEmpty()) {
+            return;
+        }
         Map<String, String> params = new HashMap<>();
         params.put("internalDB", FeConstants.INTERNAL_DB_NAME);
         params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME);
@@ -90,7 +94,7 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
         List<String> partitionAnalysisSQLs = new ArrayList<>();
         try {
             tbl.readLock();
-            Set<String> partitionNames = 
info.colToPartitions.get(info.colName);
+
             for (String partitionName : partitionNames) {
                 Partition part = tbl.getPartition(partitionName);
                 if (part == null) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoAnalyzer.java
 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
similarity index 74%
rename from 
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoAnalyzer.java
rename to 
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
index 5d704e4f3b..c7310fc0e8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoAnalyzer.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
@@ -24,8 +24,6 @@ import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.catalog.View;
 import org.apache.doris.common.Config;
-import org.apache.doris.common.DdlException;
-import org.apache.doris.common.util.MasterDaemon;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.datasource.CatalogIf;
 import org.apache.doris.statistics.AnalysisInfo.JobType;
@@ -41,43 +39,27 @@ import java.time.format.DateTimeFormatter;
 import java.time.format.DateTimeParseException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
-public class StatisticsAutoAnalyzer extends MasterDaemon {
+public class StatisticsAutoCollector extends StatisticsCollector {
 
-    private static final Logger LOG = 
LogManager.getLogger(StatisticsAutoAnalyzer.class);
+    private static final Logger LOG = 
LogManager.getLogger(StatisticsAutoCollector.class);
 
-    private final AnalysisTaskExecutor analysisTaskExecutor;
-
-    public StatisticsAutoAnalyzer() {
+    public StatisticsAutoCollector() {
         super("Automatic Analyzer",
-                
TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes) / 2);
-        analysisTaskExecutor = new 
AnalysisTaskExecutor(Config.full_auto_analyze_simultaneously_running_task_num);
-        analysisTaskExecutor.start();
+                
TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes) / 2,
+                new 
AnalysisTaskExecutor(Config.full_auto_analyze_simultaneously_running_task_num));
     }
 
     @Override
-    protected void runAfterCatalogReady() {
-        if (!Env.getCurrentEnv().isMaster()) {
-            return;
-        }
-        if (!StatisticsUtil.statsTblAvailable()) {
-            return;
-        }
-        analyzePeriodically();
+    protected void collect() {
         if 
(!checkAnalyzeTime(LocalTime.now(TimeUtils.getTimeZone().toZoneId()))) {
             return;
         }
-
-        if (!analysisTaskExecutor.idle()) {
-            return;
-        }
-
         if (Config.enable_full_auto_analyze) {
             analyzeAll();
         }
@@ -141,29 +123,18 @@ public class StatisticsAutoAnalyzer extends MasterDaemon {
         return analysisInfos;
     }
 
-    private void analyzePeriodically() {
-        try {
-            AnalysisManager analysisManager = 
Env.getCurrentEnv().getAnalysisManager();
-            List<AnalysisInfo> jobInfos = analysisManager.findPeriodicJobs();
-            for (AnalysisInfo jobInfo : jobInfos) {
-                createSystemAnalysisJob(jobInfo);
-            }
-        } catch (Exception e) {
-            LOG.warn("Failed to periodically analyze the statistics." + e);
-        }
-    }
-
     @VisibleForTesting
     protected AnalysisInfo getReAnalyzeRequiredPart(AnalysisInfo jobInfo) {
         TableIf table = StatisticsUtil
                 .findTable(jobInfo.catalogName, jobInfo.dbName, 
jobInfo.tblName);
-        TableStats tblStats = 
Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(table.getId());
+        AnalysisManager analysisManager = 
Env.getServingEnv().getAnalysisManager();
+        TableStats tblStats = 
analysisManager.findTableStatsStatus(table.getId());
 
         if (!(tblStats == null || table.needReAnalyzeTable(tblStats))) {
             return null;
         }
 
-        Set<String> needRunPartitions = 
table.findReAnalyzeNeededPartitions(tblStats);
+        Set<String> needRunPartitions = table.findReAnalyzeNeededPartitions();
 
         if (needRunPartitions.isEmpty()) {
             return null;
@@ -173,7 +144,7 @@ public class StatisticsAutoAnalyzer extends MasterDaemon {
     }
 
     @VisibleForTesting
-    public AnalysisInfo getAnalysisJobInfo(AnalysisInfo jobInfo, TableIf table,
+    protected AnalysisInfo getAnalysisJobInfo(AnalysisInfo jobInfo, TableIf 
table,
             Set<String> needRunPartitions) {
         Map<String, Set<String>> newColToPartitions = Maps.newHashMap();
         Map<String, Set<String>> colToPartitions = jobInfo.colToPartitions;
@@ -209,24 +180,4 @@ public class StatisticsAutoAnalyzer extends MasterDaemon {
             return true;
         }
     }
-
-
-    // Analysis job created by the system
-    @VisibleForTesting
-    protected void createSystemAnalysisJob(AnalysisInfo jobInfo)
-            throws DdlException {
-        if (jobInfo.colToPartitions.isEmpty()) {
-            // No statistics need to be collected or updated
-            return;
-        }
-
-        Map<Long, BaseAnalysisTask> analysisTaskInfos = new HashMap<>();
-        AnalysisManager analysisManager = 
Env.getCurrentEnv().getAnalysisManager();
-        analysisManager.createTaskForEachColumns(jobInfo, analysisTaskInfos, 
false);
-        if (StatisticsUtil.isExternalTable(jobInfo.catalogName, 
jobInfo.dbName, jobInfo.tblName)) {
-            analysisManager.createTableLevelTaskForExternalTable(jobInfo, 
analysisTaskInfos, false);
-        }
-        Env.getCurrentEnv().getAnalysisManager().registerSysJob(jobInfo, 
analysisTaskInfos);
-        analysisTaskInfos.values().forEach(analysisTaskExecutor::submitTask);
-    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java
new file mode 100644
index 0000000000..f65829e748
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.statistics;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.util.MasterDaemon;
+import org.apache.doris.statistics.util.StatisticsUtil;
+
+import org.apache.hudi.common.util.VisibleForTesting;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public abstract class StatisticsCollector extends MasterDaemon {
+
+
+    protected final AnalysisTaskExecutor analysisTaskExecutor;
+
+
+    public StatisticsCollector(String name, long intervalMs, 
AnalysisTaskExecutor analysisTaskExecutor) {
+        super(name, intervalMs);
+        this.analysisTaskExecutor = analysisTaskExecutor;
+        analysisTaskExecutor.start();
+    }
+
+    @Override
+    protected void runAfterCatalogReady() {
+        if (!Env.getCurrentEnv().isMaster()) {
+            return;
+        }
+        if (!StatisticsUtil.statsTblAvailable()) {
+            return;
+        }
+        if (Env.isCheckpointThread()) {
+            return;
+        }
+
+        if (!analysisTaskExecutor.idle()) {
+            return;
+        }
+        collect();
+    }
+
+    protected abstract void collect();
+
+    // Analysis job created by the system
+    @VisibleForTesting
+    protected void createSystemAnalysisJob(AnalysisInfo jobInfo)
+            throws DdlException {
+        if (jobInfo.colToPartitions.isEmpty()) {
+            // No statistics need to be collected or updated
+            return;
+        }
+
+        Map<Long, BaseAnalysisTask> analysisTaskInfos = new HashMap<>();
+        AnalysisManager analysisManager = 
Env.getCurrentEnv().getAnalysisManager();
+        analysisManager.createTaskForEachColumns(jobInfo, analysisTaskInfos, 
false);
+        if (StatisticsUtil.isExternalTable(jobInfo.catalogName, 
jobInfo.dbName, jobInfo.tblName)) {
+            analysisManager.createTableLevelTaskForExternalTable(jobInfo, 
analysisTaskInfos, false);
+        }
+        Env.getCurrentEnv().getAnalysisManager().registerSysJob(jobInfo, 
analysisTaskInfos);
+        analysisTaskInfos.values().forEach(analysisTaskExecutor::submitTask);
+    }
+
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsPeriodCollector.java
 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsPeriodCollector.java
new file mode 100644
index 0000000000..f34ad0f122
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsPeriodCollector.java
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.statistics;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public class StatisticsPeriodCollector extends StatisticsCollector {
+    private static final Logger LOG = 
LogManager.getLogger(StatisticsPeriodCollector.class);
+
+    public StatisticsPeriodCollector() {
+        super("Automatic Analyzer",
+                
TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes) / 2,
+                new 
AnalysisTaskExecutor(Config.period_analyze_simultaneously_running_task_num));
+    }
+
+    @Override
+    protected void collect() {
+        try {
+            AnalysisManager analysisManager = 
Env.getCurrentEnv().getAnalysisManager();
+            List<AnalysisInfo> jobInfos = analysisManager.findPeriodicJobs();
+            for (AnalysisInfo jobInfo : jobInfos) {
+                createSystemAnalysisJob(jobInfo);
+            }
+        } catch (Exception e) {
+            LOG.warn("Failed to periodically analyze the statistics." + e);
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStats.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStats.java
index 0fffbd9dd7..48a8bd81c7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStats.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStats.java
@@ -55,7 +55,7 @@ public class TableStats implements Writable {
     public final AnalysisType analysisType;
 
     @SerializedName("updateTime")
-    public final long updatedTime;
+    public long updatedTime;
 
     @SerializedName("columns")
     public String columns;
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoAnalyzerTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java
similarity index 90%
rename from 
fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoAnalyzerTest.java
rename to 
fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java
index fff649a447..d152e8175f 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoAnalyzerTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java
@@ -49,7 +49,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
-public class StatisticsAutoAnalyzerTest {
+public class StatisticsAutoCollectorTest {
 
     @Test
     public void testAnalyzeAll(@Injectable AnalysisInfo analysisInfo) {
@@ -65,7 +65,7 @@ public class StatisticsAutoAnalyzerTest {
                 return databaseIfs;
             }
         };
-        new MockUp<StatisticsAutoAnalyzer>() {
+        new MockUp<StatisticsAutoCollector>() {
             @Mock
             public List<AnalysisInfo> 
constructAnalysisInfo(DatabaseIf<TableIf> db) {
                 return Arrays.asList(analysisInfo, analysisInfo);
@@ -85,7 +85,7 @@ public class StatisticsAutoAnalyzerTest {
             }
         };
 
-        StatisticsAutoAnalyzer saa = new StatisticsAutoAnalyzer();
+        StatisticsAutoCollector saa = new StatisticsAutoCollector();
         saa.runAfterCatalogReady();
         new Expectations() {
             {
@@ -131,7 +131,7 @@ public class StatisticsAutoAnalyzerTest {
                 return columns;
             }
         };
-        StatisticsAutoAnalyzer saa = new StatisticsAutoAnalyzer();
+        StatisticsAutoCollector saa = new StatisticsAutoCollector();
         List<AnalysisInfo> analysisInfos =
                 saa.constructAnalysisInfo(new Database(1, "anydb"));
         Assertions.assertEquals(1, analysisInfos.size());
@@ -145,7 +145,7 @@ public class StatisticsAutoAnalyzerTest {
 
         new MockUp<OlapTable>() {
             @Mock
-            protected Set<String> findReAnalyzeNeededPartitions(TableStats 
tableStats) {
+            protected Set<String> findReAnalyzeNeededPartitions() {
                 Set<String> partitionNames = new HashSet<>();
                 partitionNames.add("p1");
                 partitionNames.add("p2");
@@ -185,20 +185,20 @@ public class StatisticsAutoAnalyzerTest {
             }
         };
 
-        new MockUp<StatisticsAutoAnalyzer>() {
+        new MockUp<StatisticsAutoCollector>() {
             @Mock
             public AnalysisInfo getAnalysisJobInfo(AnalysisInfo jobInfo, 
TableIf table,
                     Set<String> needRunPartitions) {
                 return new AnalysisInfoBuilder().build();
             }
         };
-        StatisticsAutoAnalyzer statisticsAutoAnalyzer = new 
StatisticsAutoAnalyzer();
+        StatisticsAutoCollector statisticsAutoCollector = new 
StatisticsAutoCollector();
         AnalysisInfo analysisInfo2 = new AnalysisInfoBuilder()
                 .setCatalogName("cname")
                 .setDbName("db")
                 .setTblName("tbl").build();
-        
Assertions.assertNotNull(statisticsAutoAnalyzer.getReAnalyzeRequiredPart(analysisInfo2));
-        
Assertions.assertNull(statisticsAutoAnalyzer.getReAnalyzeRequiredPart(analysisInfo2));
-        
Assertions.assertNotNull(statisticsAutoAnalyzer.getReAnalyzeRequiredPart(analysisInfo2));
+        
Assertions.assertNotNull(statisticsAutoCollector.getReAnalyzeRequiredPart(analysisInfo2));
+        
Assertions.assertNull(statisticsAutoCollector.getReAnalyzeRequiredPart(analysisInfo2));
+        
Assertions.assertNotNull(statisticsAutoCollector.getReAnalyzeRequiredPart(analysisInfo2));
     }
 }
diff --git a/regression-test/suites/statistics/analyze_stats.groovy 
b/regression-test/suites/statistics/analyze_stats.groovy
index 50420613b5..3220a34ee5 100644
--- a/regression-test/suites/statistics/analyze_stats.groovy
+++ b/regression-test/suites/statistics/analyze_stats.groovy
@@ -19,6 +19,14 @@ suite("test_analyze") {
     String db = "regression_test_statistics"
     String tbl = "analyzetestlimited_duplicate_all"
 
+    sql """
+        DROP DATABASE IF EXISTS `${db}`
+    """
+
+    sql """
+        CREATE DATABASE `${db}`
+    """
+
     sql """
         DROP TABLE IF EXISTS `${tbl}`
     """


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to