This is an automated email from the ASF dual-hosted git repository. morrysnow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 72b709d6a9 [opt](stats) split period collector from auto collector (#23622) 72b709d6a9 is described below commit 72b709d6a9ee2a6637c78a1a9a7f0c3b49b69a2b Author: AKIRA <33112463+kikyou1...@users.noreply.github.com> AuthorDate: Mon Sep 4 17:04:16 2023 +0800 [opt](stats) split period collector from auto collector (#23622) 1. Split period analyze from auto collector 2. Analyze table incrementally by default 3. Rename StatisticsAutoAnalyzer to StatisticsAutoCollector --- .../main/java/org/apache/doris/common/Config.java | 3 + .../main/java/org/apache/doris/catalog/Env.java | 21 +++--- .../java/org/apache/doris/catalog/OlapTable.java | 3 +- .../main/java/org/apache/doris/catalog/Table.java | 2 +- .../java/org/apache/doris/catalog/TableIf.java | 2 +- .../doris/catalog/external/ExternalTable.java | 2 +- .../org/apache/doris/statistics/AnalysisInfo.java | 1 + .../apache/doris/statistics/AnalysisManager.java | 46 +++++++----- .../doris/statistics/AnalysisTaskExecutor.java | 20 ++++-- .../apache/doris/statistics/OlapAnalysisTask.java | 6 +- ...oAnalyzer.java => StatisticsAutoCollector.java} | 69 +++--------------- .../doris/statistics/StatisticsCollector.java | 81 ++++++++++++++++++++++ .../statistics/StatisticsPeriodCollector.java | 50 +++++++++++++ .../org/apache/doris/statistics/TableStats.java | 2 +- ...rTest.java => StatisticsAutoCollectorTest.java} | 20 +++--- .../suites/statistics/analyze_stats.groovy | 8 +++ 16 files changed, 229 insertions(+), 107 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index d13fc72e7e..2c6087cd28 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2103,6 +2103,9 @@ public class Config extends ConfigBase { @ConfField public static int full_auto_analyze_simultaneously_running_task_num = 1; + @ConfField + public static final int period_analyze_simultaneously_running_task_num = 1; + @ConfField public static int cpu_resource_limit_per_analyze_task = 1; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 71d1ef1d8d..8f9ca541a8 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -224,9 +224,10 @@ import org.apache.doris.scheduler.registry.TimerJobRegister; import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.FrontendOptions; import org.apache.doris.statistics.AnalysisManager; -import org.apache.doris.statistics.StatisticsAutoAnalyzer; +import org.apache.doris.statistics.StatisticsAutoCollector; import org.apache.doris.statistics.StatisticsCache; import org.apache.doris.statistics.StatisticsCleaner; +import org.apache.doris.statistics.StatisticsPeriodCollector; import org.apache.doris.statistics.query.QueryStats; import org.apache.doris.system.Backend; import org.apache.doris.system.Frontend; @@ -476,7 +477,9 @@ public class Env { */ private final LoadManagerAdapter loadManagerAdapter; - private StatisticsAutoAnalyzer statisticsAutoAnalyzer; + private StatisticsAutoCollector statisticsAutoCollector; + + private StatisticsPeriodCollector statisticsPeriodCollector; private HiveTransactionMgr hiveTransactionMgr; @@ -701,7 +704,8 @@ public class Env { this.extMetaCacheMgr = new ExternalMetaCacheMgr(); this.analysisManager = new AnalysisManager(); this.statisticsCleaner = new StatisticsCleaner(); - this.statisticsAutoAnalyzer = new StatisticsAutoAnalyzer(); + this.statisticsAutoCollector = new StatisticsAutoCollector(); + this.statisticsPeriodCollector = new StatisticsPeriodCollector(); this.globalFunctionMgr = new GlobalFunctionMgr(); this.workloadGroupMgr = new WorkloadGroupMgr(); this.queryStats = new QueryStats(); @@ -948,8 +952,11 @@ public class Env { if (statisticsCleaner != null) { statisticsCleaner.start(); } - if (statisticsAutoAnalyzer != null) { - statisticsAutoAnalyzer.start(); + if (statisticsAutoCollector != null) { + statisticsAutoCollector.start(); + } + if (statisticsPeriodCollector != null) { + statisticsPeriodCollector.start(); } } @@ -5578,10 +5585,6 @@ public class Env { return loadManagerAdapter; } - public StatisticsAutoAnalyzer getStatisticsAutoAnalyzer() { - return statisticsAutoAnalyzer; - } - public QueryStats getQueryStats() { return queryStats; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 39b5450e23..6fc6bb2ed5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -1137,7 +1137,8 @@ public class OlapTable extends Table { } @Override - public Set<String> findReAnalyzeNeededPartitions(TableStats tableStats) { + public Set<String> findReAnalyzeNeededPartitions() { + TableStats tableStats = Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(getId()); if (tableStats == null) { return getPartitionNames().stream().map(this::getPartition) .filter(Partition::hasData).map(Partition::getName).collect(Collectors.toSet()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java index 12689894b4..ef71e394e5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java @@ -580,7 +580,7 @@ public abstract class Table extends MetaObject implements Writable, TableIf { } @Override - public Set<String> findReAnalyzeNeededPartitions(TableStats tableStats) { + public Set<String> findReAnalyzeNeededPartitions() { return Collections.emptySet(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java index 21e2ddd154..ae67d0c9e1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java @@ -139,7 +139,7 @@ public interface TableIf { boolean needReAnalyzeTable(TableStats tblStats); - Set<String> findReAnalyzeNeededPartitions(TableStats tableStats); + Set<String> findReAnalyzeNeededPartitions(); void write(DataOutput out) throws IOException; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java index 6f31ac18d7..ca5b80bc43 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java @@ -388,7 +388,7 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable { } @Override - public Set<String> findReAnalyzeNeededPartitions(TableStats tableStats) { + public Set<String> findReAnalyzeNeededPartitions() { HashSet<String> partitions = Sets.newHashSet(); // TODO: Find a way to collect external table partitions that need to be analyzed. partitions.add("Dummy Partition"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java index 8f33480640..c20bad6396 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java @@ -96,6 +96,7 @@ public class AnalysisInfo implements Writable { @SerializedName("tblName") public final String tblName; + // TODO: Map here is wired, List is enough @SerializedName("colToPartitions") public final Map<String, Set<String>> colToPartitions; diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index bdd325e6d1..853e9b3393 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -29,6 +29,7 @@ import org.apache.doris.analysis.TableName; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.View; @@ -363,6 +364,9 @@ public class AnalysisManager extends Daemon implements Writable { ShowResultSetMetaData commonResultSetMetaData = new ShowResultSetMetaData(columns); List<List<String>> resultRows = new ArrayList<>(); for (AnalysisInfo analysisInfo : analysisInfos) { + if (analysisInfo == null) { + continue; + } List<String> row = new ArrayList<>(); row.add(analysisInfo.catalogName); row.add(analysisInfo.dbName); @@ -442,23 +446,9 @@ public class AnalysisManager extends Daemon implements Writable { StatisticsRepository.dropStatistics(invalidPartIds); } - if (analysisMode == AnalysisMode.INCREMENTAL && analysisType == AnalysisType.FUNDAMENTALS) { - existColAndPartsForStats.values().forEach(partIds -> partIds.removeAll(invalidPartIds)); - // In incremental collection mode, just collect the uncollected partition statistics - existColAndPartsForStats.forEach((columnName, partitionIds) -> { - Set<String> existPartitions = partitionIds.stream() - .map(idToPartition::get) - .collect(Collectors.toSet()); - columnToPartitions.computeIfPresent(columnName, (colName, partNames) -> { - partNames.removeAll(existPartitions); - return partNames; - }); - }); - if (invalidPartIds.isEmpty()) { - // There is no invalid statistics, so there is no need to update table statistics, - // remove columns that do not require re-collection of statistics - columnToPartitions.entrySet().removeIf(entry -> entry.getValue().isEmpty()); - } + if (analysisType == AnalysisType.FUNDAMENTALS) { + Set<String> reAnalyzeNeededPartitions = findReAnalyzeNeededPartitions(table); + columnToPartitions.replaceAll((k, v) -> reAnalyzeNeededPartitions); } return columnToPartitions; @@ -692,6 +682,12 @@ public class AnalysisManager extends Daemon implements Writable { } Set<String> cols = dropStatsStmt.getColumnNames(); long tblId = dropStatsStmt.getTblId(); + TableStats tableStats = findTableStatsStatus(dropStatsStmt.getTblId()); + if (tableStats == null) { + return; + } + tableStats.updatedTime = 0; + replayUpdateTableStatsStatus(tableStats); StatisticsRepository.dropStatistics(tblId, cols); for (String col : cols) { Env.getCurrentEnv().getStatisticsCache().invalidate(tblId, -1L, col); @@ -951,4 +947,20 @@ public class AnalysisManager extends Daemon implements Writable { systemJobInfoMap.put(jobInfo.jobId, jobInfo); analysisJobIdToTaskMap.put(jobInfo.jobId, taskInfos); } + + @VisibleForTesting + protected Set<String> findReAnalyzeNeededPartitions(TableIf table) { + TableStats tableStats = findTableStatsStatus(table.getId()); + if (tableStats == null) { + return table.getPartitionNames().stream().map(table::getPartition) + .filter(Partition::hasData).map(Partition::getName).collect(Collectors.toSet()); + } + return table.getPartitionNames().stream() + .map(table::getPartition) + .filter(Partition::hasData) + .filter(partition -> + partition.getVisibleVersionTime() >= tableStats.updatedTime).map(Partition::getName) + .collect(Collectors.toSet()); + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java index fb23050fff..a7b0073bb4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java @@ -17,6 +17,7 @@ package org.apache.doris.statistics; +import org.apache.doris.catalog.Env; import org.apache.doris.common.Config; import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.ThreadPoolManager.BlockedPolicy; @@ -42,16 +43,23 @@ public class AnalysisTaskExecutor extends Thread { Comparator.comparingLong(AnalysisTaskWrapper::getStartTime)); public AnalysisTaskExecutor(int simultaneouslyRunningTaskNum) { - executors = ThreadPoolManager.newDaemonThreadPool( - simultaneouslyRunningTaskNum, - simultaneouslyRunningTaskNum, 0, - TimeUnit.DAYS, new LinkedBlockingQueue<>(), - new BlockedPolicy("Analysis Job Executor", Integer.MAX_VALUE), - "Analysis Job Executor", true); + if (!Env.isCheckpointThread()) { + executors = ThreadPoolManager.newDaemonThreadPool( + simultaneouslyRunningTaskNum, + simultaneouslyRunningTaskNum, 0, + TimeUnit.DAYS, new LinkedBlockingQueue<>(), + new BlockedPolicy("Analysis Job Executor", Integer.MAX_VALUE), + "Analysis Job Executor", true); + } else { + executors = null; + } } @Override public void run() { + if (Env.isCheckpointThread()) { + return; + } cancelExpiredTask(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java index ef26d7349e..9d34b6aabd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java @@ -74,6 +74,10 @@ public class OlapAnalysisTask extends BaseAnalysisTask { } public void doExecute() throws Exception { + Set<String> partitionNames = info.colToPartitions.get(info.colName); + if (partitionNames.isEmpty()) { + return; + } Map<String, String> params = new HashMap<>(); params.put("internalDB", FeConstants.INTERNAL_DB_NAME); params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME); @@ -90,7 +94,7 @@ public class OlapAnalysisTask extends BaseAnalysisTask { List<String> partitionAnalysisSQLs = new ArrayList<>(); try { tbl.readLock(); - Set<String> partitionNames = info.colToPartitions.get(info.colName); + for (String partitionName : partitionNames) { Partition part = tbl.getPartition(partitionName); if (part == null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java similarity index 74% rename from fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoAnalyzer.java rename to fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java index 5d704e4f3b..c7310fc0e8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoAnalyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java @@ -24,8 +24,6 @@ import org.apache.doris.catalog.Env; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.View; import org.apache.doris.common.Config; -import org.apache.doris.common.DdlException; -import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.CatalogIf; import org.apache.doris.statistics.AnalysisInfo.JobType; @@ -41,43 +39,27 @@ import java.time.format.DateTimeFormatter; import java.time.format.DateTimeParseException; import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -public class StatisticsAutoAnalyzer extends MasterDaemon { +public class StatisticsAutoCollector extends StatisticsCollector { - private static final Logger LOG = LogManager.getLogger(StatisticsAutoAnalyzer.class); + private static final Logger LOG = LogManager.getLogger(StatisticsAutoCollector.class); - private final AnalysisTaskExecutor analysisTaskExecutor; - - public StatisticsAutoAnalyzer() { + public StatisticsAutoCollector() { super("Automatic Analyzer", - TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes) / 2); - analysisTaskExecutor = new AnalysisTaskExecutor(Config.full_auto_analyze_simultaneously_running_task_num); - analysisTaskExecutor.start(); + TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes) / 2, + new AnalysisTaskExecutor(Config.full_auto_analyze_simultaneously_running_task_num)); } @Override - protected void runAfterCatalogReady() { - if (!Env.getCurrentEnv().isMaster()) { - return; - } - if (!StatisticsUtil.statsTblAvailable()) { - return; - } - analyzePeriodically(); + protected void collect() { if (!checkAnalyzeTime(LocalTime.now(TimeUtils.getTimeZone().toZoneId()))) { return; } - - if (!analysisTaskExecutor.idle()) { - return; - } - if (Config.enable_full_auto_analyze) { analyzeAll(); } @@ -141,29 +123,18 @@ public class StatisticsAutoAnalyzer extends MasterDaemon { return analysisInfos; } - private void analyzePeriodically() { - try { - AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager(); - List<AnalysisInfo> jobInfos = analysisManager.findPeriodicJobs(); - for (AnalysisInfo jobInfo : jobInfos) { - createSystemAnalysisJob(jobInfo); - } - } catch (Exception e) { - LOG.warn("Failed to periodically analyze the statistics." + e); - } - } - @VisibleForTesting protected AnalysisInfo getReAnalyzeRequiredPart(AnalysisInfo jobInfo) { TableIf table = StatisticsUtil .findTable(jobInfo.catalogName, jobInfo.dbName, jobInfo.tblName); - TableStats tblStats = Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(table.getId()); + AnalysisManager analysisManager = Env.getServingEnv().getAnalysisManager(); + TableStats tblStats = analysisManager.findTableStatsStatus(table.getId()); if (!(tblStats == null || table.needReAnalyzeTable(tblStats))) { return null; } - Set<String> needRunPartitions = table.findReAnalyzeNeededPartitions(tblStats); + Set<String> needRunPartitions = table.findReAnalyzeNeededPartitions(); if (needRunPartitions.isEmpty()) { return null; @@ -173,7 +144,7 @@ public class StatisticsAutoAnalyzer extends MasterDaemon { } @VisibleForTesting - public AnalysisInfo getAnalysisJobInfo(AnalysisInfo jobInfo, TableIf table, + protected AnalysisInfo getAnalysisJobInfo(AnalysisInfo jobInfo, TableIf table, Set<String> needRunPartitions) { Map<String, Set<String>> newColToPartitions = Maps.newHashMap(); Map<String, Set<String>> colToPartitions = jobInfo.colToPartitions; @@ -209,24 +180,4 @@ public class StatisticsAutoAnalyzer extends MasterDaemon { return true; } } - - - // Analysis job created by the system - @VisibleForTesting - protected void createSystemAnalysisJob(AnalysisInfo jobInfo) - throws DdlException { - if (jobInfo.colToPartitions.isEmpty()) { - // No statistics need to be collected or updated - return; - } - - Map<Long, BaseAnalysisTask> analysisTaskInfos = new HashMap<>(); - AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager(); - analysisManager.createTaskForEachColumns(jobInfo, analysisTaskInfos, false); - if (StatisticsUtil.isExternalTable(jobInfo.catalogName, jobInfo.dbName, jobInfo.tblName)) { - analysisManager.createTableLevelTaskForExternalTable(jobInfo, analysisTaskInfos, false); - } - Env.getCurrentEnv().getAnalysisManager().registerSysJob(jobInfo, analysisTaskInfos); - analysisTaskInfos.values().forEach(analysisTaskExecutor::submitTask); - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java new file mode 100644 index 0000000000..f65829e748 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java @@ -0,0 +1,81 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.util.MasterDaemon; +import org.apache.doris.statistics.util.StatisticsUtil; + +import org.apache.hudi.common.util.VisibleForTesting; + +import java.util.HashMap; +import java.util.Map; + +public abstract class StatisticsCollector extends MasterDaemon { + + + protected final AnalysisTaskExecutor analysisTaskExecutor; + + + public StatisticsCollector(String name, long intervalMs, AnalysisTaskExecutor analysisTaskExecutor) { + super(name, intervalMs); + this.analysisTaskExecutor = analysisTaskExecutor; + analysisTaskExecutor.start(); + } + + @Override + protected void runAfterCatalogReady() { + if (!Env.getCurrentEnv().isMaster()) { + return; + } + if (!StatisticsUtil.statsTblAvailable()) { + return; + } + if (Env.isCheckpointThread()) { + return; + } + + if (!analysisTaskExecutor.idle()) { + return; + } + collect(); + } + + protected abstract void collect(); + + // Analysis job created by the system + @VisibleForTesting + protected void createSystemAnalysisJob(AnalysisInfo jobInfo) + throws DdlException { + if (jobInfo.colToPartitions.isEmpty()) { + // No statistics need to be collected or updated + return; + } + + Map<Long, BaseAnalysisTask> analysisTaskInfos = new HashMap<>(); + AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager(); + analysisManager.createTaskForEachColumns(jobInfo, analysisTaskInfos, false); + if (StatisticsUtil.isExternalTable(jobInfo.catalogName, jobInfo.dbName, jobInfo.tblName)) { + analysisManager.createTableLevelTaskForExternalTable(jobInfo, analysisTaskInfos, false); + } + Env.getCurrentEnv().getAnalysisManager().registerSysJob(jobInfo, analysisTaskInfos); + analysisTaskInfos.values().forEach(analysisTaskExecutor::submitTask); + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsPeriodCollector.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsPeriodCollector.java new file mode 100644 index 0000000000..f34ad0f122 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsPeriodCollector.java @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.Config; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +public class StatisticsPeriodCollector extends StatisticsCollector { + private static final Logger LOG = LogManager.getLogger(StatisticsPeriodCollector.class); + + public StatisticsPeriodCollector() { + super("Automatic Analyzer", + TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes) / 2, + new AnalysisTaskExecutor(Config.period_analyze_simultaneously_running_task_num)); + } + + @Override + protected void collect() { + try { + AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager(); + List<AnalysisInfo> jobInfos = analysisManager.findPeriodicJobs(); + for (AnalysisInfo jobInfo : jobInfos) { + createSystemAnalysisJob(jobInfo); + } + } catch (Exception e) { + LOG.warn("Failed to periodically analyze the statistics." + e); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStats.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStats.java index 0fffbd9dd7..48a8bd81c7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStats.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStats.java @@ -55,7 +55,7 @@ public class TableStats implements Writable { public final AnalysisType analysisType; @SerializedName("updateTime") - public final long updatedTime; + public long updatedTime; @SerializedName("columns") public String columns; diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoAnalyzerTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java similarity index 90% rename from fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoAnalyzerTest.java rename to fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java index fff649a447..d152e8175f 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoAnalyzerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java @@ -49,7 +49,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -public class StatisticsAutoAnalyzerTest { +public class StatisticsAutoCollectorTest { @Test public void testAnalyzeAll(@Injectable AnalysisInfo analysisInfo) { @@ -65,7 +65,7 @@ public class StatisticsAutoAnalyzerTest { return databaseIfs; } }; - new MockUp<StatisticsAutoAnalyzer>() { + new MockUp<StatisticsAutoCollector>() { @Mock public List<AnalysisInfo> constructAnalysisInfo(DatabaseIf<TableIf> db) { return Arrays.asList(analysisInfo, analysisInfo); @@ -85,7 +85,7 @@ public class StatisticsAutoAnalyzerTest { } }; - StatisticsAutoAnalyzer saa = new StatisticsAutoAnalyzer(); + StatisticsAutoCollector saa = new StatisticsAutoCollector(); saa.runAfterCatalogReady(); new Expectations() { { @@ -131,7 +131,7 @@ public class StatisticsAutoAnalyzerTest { return columns; } }; - StatisticsAutoAnalyzer saa = new StatisticsAutoAnalyzer(); + StatisticsAutoCollector saa = new StatisticsAutoCollector(); List<AnalysisInfo> analysisInfos = saa.constructAnalysisInfo(new Database(1, "anydb")); Assertions.assertEquals(1, analysisInfos.size()); @@ -145,7 +145,7 @@ public class StatisticsAutoAnalyzerTest { new MockUp<OlapTable>() { @Mock - protected Set<String> findReAnalyzeNeededPartitions(TableStats tableStats) { + protected Set<String> findReAnalyzeNeededPartitions() { Set<String> partitionNames = new HashSet<>(); partitionNames.add("p1"); partitionNames.add("p2"); @@ -185,20 +185,20 @@ public class StatisticsAutoAnalyzerTest { } }; - new MockUp<StatisticsAutoAnalyzer>() { + new MockUp<StatisticsAutoCollector>() { @Mock public AnalysisInfo getAnalysisJobInfo(AnalysisInfo jobInfo, TableIf table, Set<String> needRunPartitions) { return new AnalysisInfoBuilder().build(); } }; - StatisticsAutoAnalyzer statisticsAutoAnalyzer = new StatisticsAutoAnalyzer(); + StatisticsAutoCollector statisticsAutoCollector = new StatisticsAutoCollector(); AnalysisInfo analysisInfo2 = new AnalysisInfoBuilder() .setCatalogName("cname") .setDbName("db") .setTblName("tbl").build(); - Assertions.assertNotNull(statisticsAutoAnalyzer.getReAnalyzeRequiredPart(analysisInfo2)); - Assertions.assertNull(statisticsAutoAnalyzer.getReAnalyzeRequiredPart(analysisInfo2)); - Assertions.assertNotNull(statisticsAutoAnalyzer.getReAnalyzeRequiredPart(analysisInfo2)); + Assertions.assertNotNull(statisticsAutoCollector.getReAnalyzeRequiredPart(analysisInfo2)); + Assertions.assertNull(statisticsAutoCollector.getReAnalyzeRequiredPart(analysisInfo2)); + Assertions.assertNotNull(statisticsAutoCollector.getReAnalyzeRequiredPart(analysisInfo2)); } } diff --git a/regression-test/suites/statistics/analyze_stats.groovy b/regression-test/suites/statistics/analyze_stats.groovy index 50420613b5..3220a34ee5 100644 --- a/regression-test/suites/statistics/analyze_stats.groovy +++ b/regression-test/suites/statistics/analyze_stats.groovy @@ -19,6 +19,14 @@ suite("test_analyze") { String db = "regression_test_statistics" String tbl = "analyzetestlimited_duplicate_all" + sql """ + DROP DATABASE IF EXISTS `${db}` + """ + + sql """ + CREATE DATABASE `${db}` + """ + sql """ DROP TABLE IF EXISTS `${tbl}` """ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org