This is an automated email from the ASF dual-hosted git repository. lijibing pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 36695e871ad [feature](statistics)Support auto analyze columns that haven't been analyzed for a long time. #42399 (#45250) 36695e871ad is described below commit 36695e871ad64d765530f2ca54bd94923800fb75 Author: James <lijib...@selectdb.com> AuthorDate: Thu Dec 12 01:57:44 2024 +0800 [feature](statistics)Support auto analyze columns that haven't been analyzed for a long time. #42399 (#45250) backport: https://github.com/apache/doris/pull/42399 --- .../main/java/org/apache/doris/common/Config.java | 7 + .../org/apache/doris/analysis/ShowAnalyzeStmt.java | 1 + .../apache/doris/analysis/ShowColumnStatsStmt.java | 2 + .../apache/doris/analysis/ShowTableStatsStmt.java | 11 +- .../main/java/org/apache/doris/catalog/Env.java | 22 +- .../org/apache/doris/datasource/ExternalTable.java | 2 +- .../java/org/apache/doris/qe/ShowExecutor.java | 1 + .../java/org/apache/doris/qe/StmtExecutor.java | 2 +- .../org/apache/doris/statistics/AnalysisInfo.java | 13 +- .../doris/statistics/AnalysisInfoBuilder.java | 17 +- .../apache/doris/statistics/AnalysisManager.java | 27 +- .../doris/statistics/AnalysisTaskExecutor.java | 5 +- .../org/apache/doris/statistics/ColStatsMeta.java | 6 +- .../org/apache/doris/statistics/JobPriority.java | 25 + .../doris/statistics/StatisticsAutoCollector.java | 285 ++++++------ .../doris/statistics/StatisticsCollector.java | 79 ---- .../doris/statistics/StatisticsJobAppender.java | 152 +++++++ .../apache/doris/statistics/TableStatsMeta.java | 16 +- .../doris/statistics/util/StatisticsUtil.java | 35 ++ .../statistics/StatisticsAutoCollectorTest.java | 502 ++------------------- .../statistics/StatisticsJobAppenderTest.java | 84 ++++ .../doris/statistics/util/StatisticsUtilTest.java | 85 ++++ .../hive/test_hive_statistic_auto.groovy | 12 +- .../suites/statistics/analyze_stats.groovy | 35 +- 24 files changed, 700 insertions(+), 726 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 359d193a5ba..ec09c850805 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2764,6 +2764,13 @@ public class Config extends ConfigBase { public static boolean enable_proxy_protocol = false; public static int profile_async_collect_expire_time_secs = 5; + @ConfField(mutable = true, description = { + "内表自动收集时间间隔,当某一列上次收集时间距离当前时间大于该值,则会触发一次新的收集,0表示不会触发。", + "Columns that have not been collected within the specified interval will trigger automatic analyze. " + + "0 means not trigger." + }) + public static long auto_analyze_interval_seconds = 86400; + //========================================================================== // begin of cloud config diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowAnalyzeStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowAnalyzeStmt.java index 9ccfd956ca5..f660d6eeb3c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowAnalyzeStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowAnalyzeStmt.java @@ -62,6 +62,7 @@ public class ShowAnalyzeStmt extends ShowStmt { .add("schedule_type") .add("start_time") .add("end_time") + .add("priority") .build(); private long jobId; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowColumnStatsStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowColumnStatsStmt.java index 18bb916b8bd..36986dc9d4e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowColumnStatsStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowColumnStatsStmt.java @@ -62,6 +62,7 @@ public class ShowColumnStatsStmt extends ShowStmt { .add("trigger") .add("query_times") .add("updated_time") + .add("last_analyze_version") .build(); private final TableName tableName; @@ -162,6 +163,7 @@ public class ShowColumnStatsStmt extends ShowStmt { row.add(String.valueOf(colStatsMeta == null ? "N/A" : colStatsMeta.jobType)); row.add(String.valueOf(colStatsMeta == null ? "N/A" : colStatsMeta.queriedTimes)); row.add(String.valueOf(p.second.updatedTime)); + row.add(String.valueOf(colStatsMeta == null ? "N/A" : colStatsMeta.tableVersion)); result.add(row); }); return new ShowResultSet(getMetaData(), result); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTableStatsStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTableStatsStmt.java index 91b8bf1de2d..915b5f19e03 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTableStatsStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTableStatsStmt.java @@ -57,6 +57,7 @@ public class ShowTableStatsStmt extends ShowStmt { .add("trigger") .add("new_partition") .add("user_inject") + .add("last_analyze_time") .build(); private static final ImmutableList<String> INDEX_TITLE_NAMES = @@ -192,6 +193,7 @@ public class ShowTableStatsStmt extends ShowStmt { row.add(""); row.add(""); row.add(""); + row.add(""); result.add(row); return new ShowResultSet(getMetaData(), result); } @@ -201,15 +203,18 @@ public class ShowTableStatsStmt extends ShowStmt { row.add(String.valueOf(tableStatistic.updatedRows)); row.add(String.valueOf(tableStatistic.queriedTimes.get())); row.add(String.valueOf(tableStatistic.rowCount)); - LocalDateTime dateTime = + LocalDateTime tableUpdateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(tableStatistic.updatedTime), java.time.ZoneId.systemDefault()); - String formattedDateTime = dateTime.format(formatter); - row.add(formattedDateTime); + LocalDateTime lastAnalyzeTime = + LocalDateTime.ofInstant(Instant.ofEpochMilli(tableStatistic.lastAnalyzeTime), + java.time.ZoneId.systemDefault()); + row.add(tableUpdateTime.format(formatter)); row.add(tableStatistic.analyzeColumns().toString()); row.add(tableStatistic.jobType.toString()); row.add(String.valueOf(tableStatistic.newPartitionLoaded.get())); row.add(String.valueOf(tableStatistic.userInjected)); + row.add(lastAnalyzeTime.format(formatter)); result.add(row); return new ShowResultSet(getMetaData(), result); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index cf0885e6ec2..321d2f53fc7 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -258,6 +258,7 @@ import org.apache.doris.statistics.AnalysisManager; import org.apache.doris.statistics.StatisticsAutoCollector; import org.apache.doris.statistics.StatisticsCache; import org.apache.doris.statistics.StatisticsCleaner; +import org.apache.doris.statistics.StatisticsJobAppender; import org.apache.doris.statistics.query.QueryStats; import org.apache.doris.system.Backend; import org.apache.doris.system.Frontend; @@ -540,6 +541,7 @@ public class Env { private final LoadManagerAdapter loadManagerAdapter; private StatisticsAutoCollector statisticsAutoCollector; + private StatisticsJobAppender statisticsJobAppender; private HiveTransactionMgr hiveTransactionMgr; @@ -780,6 +782,7 @@ public class Env { this.analysisManager = new AnalysisManager(); this.statisticsCleaner = new StatisticsCleaner(); this.statisticsAutoCollector = new StatisticsAutoCollector(); + this.statisticsJobAppender = new StatisticsJobAppender("StatisticsJobAppender"); this.globalFunctionMgr = new GlobalFunctionMgr(); this.workloadGroupMgr = new WorkloadGroupMgr(); this.workloadSchedPolicyMgr = new WorkloadSchedPolicyMgr(); @@ -1078,12 +1081,6 @@ public class Env { // If not using bdb, we need to notify the FE type transfer manually. notifyNewFETypeTransfer(FrontendNodeType.MASTER); } - if (statisticsCleaner != null) { - statisticsCleaner.start(); - } - if (statisticsAutoCollector != null) { - statisticsAutoCollector.start(); - } queryCancelWorker.start(); } @@ -1623,6 +1620,15 @@ public class Env { if (analysisManager != null) { analysisManager.getStatisticsCache().preHeat(); } + if (statisticsCleaner != null) { + statisticsCleaner.start(); + } + if (statisticsAutoCollector != null) { + statisticsAutoCollector.start(); + } + if (statisticsJobAppender != null) { + statisticsJobAppender.start(); + } } catch (Throwable e) { // When failed to transfer to master, we need to exit the process. // Otherwise, the process will be in an unknown state. @@ -6327,6 +6333,10 @@ public class Env { return statisticsAutoCollector; } + public StatisticsJobAppender getStatisticsJobAppender() { + return statisticsJobAppender; + } + public NereidsSqlCacheManager getSqlCacheManager() { return sqlCacheManager; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java index 1d7d87d27f8..716f2bdca44 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java @@ -350,7 +350,7 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable { return true; } return System.currentTimeMillis() - - tblStats.updatedTime > StatisticsUtil.getExternalTableAutoAnalyzeIntervalInMillis(); + - tblStats.lastAnalyzeTime > StatisticsUtil.getExternalTableAutoAnalyzeIntervalInMillis(); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java index 741f20dd88f..fc92efd7f1c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java @@ -2799,6 +2799,7 @@ public class ShowExecutor { java.time.ZoneId.systemDefault()); row.add(startTime.format(formatter)); row.add(endTime.format(formatter)); + row.add(analysisInfo.priority == null ? "N/A" : analysisInfo.priority.name()); resultRows.add(row); } catch (Exception e) { LOG.warn("Failed to get analyze info for table {}.{}.{}, reason: {}", diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 0b8d8c229cd..f314f3aa76c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -2513,7 +2513,7 @@ public class StmtExecutor { context.getState().setOk(); } - private void handleAnalyzeStmt() throws DdlException, AnalysisException { + private void handleAnalyzeStmt() throws DdlException, AnalysisException, ExecutionException, InterruptedException { context.env.getAnalysisManager().createAnalyze((AnalyzeStmt) parsedStmt, isProxy); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java index 125b23bce7b..463b53bf645 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java @@ -195,6 +195,13 @@ public class AnalysisInfo implements Writable { @SerializedName("rowCount") public final long rowCount; + + @SerializedName("priority") + public final JobPriority priority; + + @SerializedName("tv") + public final long tableVersion; + /** * * Used to store the newest partition version of tbl when creating this job. @@ -214,7 +221,7 @@ public class AnalysisInfo implements Writable { boolean isExternalTableLevelTask, boolean partitionOnly, boolean samplingPartition, boolean isAllPartition, long partitionCount, CronExpression cronExpression, boolean forceFull, boolean usingSqlForPartitionColumn, long tblUpdateTime, boolean emptyJob, boolean userInject, - long rowCount) { + long rowCount, JobPriority priority, long tableVersion) { this.jobId = jobId; this.taskId = taskId; this.taskIds = taskIds; @@ -253,6 +260,8 @@ public class AnalysisInfo implements Writable { this.emptyJob = emptyJob; this.userInject = userInject; this.rowCount = rowCount; + this.priority = priority; + this.tableVersion = tableVersion; } @Override @@ -295,6 +304,8 @@ public class AnalysisInfo implements Writable { sj.add("forceFull: " + forceFull); sj.add("usingSqlForPartitionColumn: " + usingSqlForPartitionColumn); sj.add("emptyJob: " + emptyJob); + sj.add("priority: " + priority.name()); + sj.add("tableVersion: " + tableVersion); return sj.toString(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfoBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfoBuilder.java index 2e7c4078ca1..2dd79030220 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfoBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfoBuilder.java @@ -65,6 +65,8 @@ public class AnalysisInfoBuilder { private boolean emptyJob; private boolean userInject = false; private long rowCount; + private JobPriority priority; + private long tableVersion; public AnalysisInfoBuilder() { } @@ -105,6 +107,8 @@ public class AnalysisInfoBuilder { emptyJob = info.emptyJob; userInject = info.userInject; rowCount = info.rowCount; + priority = info.priority; + tableVersion = info.tableVersion; } public AnalysisInfoBuilder setJobId(long jobId) { @@ -282,12 +286,23 @@ public class AnalysisInfoBuilder { return this; } + public AnalysisInfoBuilder setPriority(JobPriority priority) { + this.priority = priority; + return this; + } + + public AnalysisInfoBuilder setTableVersion(long tableVersion) { + this.tableVersion = tableVersion; + return this; + } + public AnalysisInfo build() { return new AnalysisInfo(jobId, taskId, taskIds, catalogId, dbId, tblId, jobColumns, partitionNames, colName, indexId, jobType, analysisMode, analysisMethod, analysisType, samplePercent, sampleRows, maxBucketNum, periodTimeInMs, message, lastExecTimeInMs, timeCostInMs, state, scheduleType, externalTableLevelTask, partitionOnly, samplingPartition, isAllPartition, partitionCount, - cronExpression, forceFull, usingSqlForPartitionColumn, tblUpdateTime, emptyJob, userInject, rowCount); + cronExpression, forceFull, usingSqlForPartitionColumn, tblUpdateTime, emptyJob, userInject, rowCount, + priority, tableVersion); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index 5f6206e854e..0e4a1c7b42d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -99,6 +99,7 @@ import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -144,7 +145,8 @@ public class AnalysisManager implements Writable { return statisticsCache; } - public void createAnalyze(AnalyzeStmt analyzeStmt, boolean proxy) throws DdlException, AnalysisException { + public void createAnalyze(AnalyzeStmt analyzeStmt, boolean proxy) + throws DdlException, AnalysisException, ExecutionException, InterruptedException { if (!StatisticsUtil.statsTblAvailable() && !FeConstants.runningUnitTest) { throw new DdlException("Stats table not available, please make sure your cluster status is normal"); } @@ -157,10 +159,8 @@ public class AnalysisManager implements Writable { public void createAnalysisJobs(AnalyzeDBStmt analyzeDBStmt, boolean proxy) throws DdlException, AnalysisException { DatabaseIf<TableIf> db = analyzeDBStmt.getDb(); - // Using auto analyzer if user specifies. if (analyzeDBStmt.getAnalyzeProperties().getProperties().containsKey("use.auto.analyzer")) { - Env.getCurrentEnv().getStatisticsAutoCollector().analyzeDb(db); - return; + throw new DdlException("Analyze database doesn't support use.auto.analyzer property."); } List<AnalysisInfo> analysisInfos = buildAnalysisInfosForDB(db, analyzeDBStmt.getAnalyzeProperties()); if (!analyzeDBStmt.isSync()) { @@ -208,22 +208,12 @@ public class AnalysisManager implements Writable { } // Each analyze stmt corresponding to an analysis job. - public void createAnalysisJob(AnalyzeTblStmt stmt, boolean proxy) throws DdlException, AnalysisException { + public void createAnalysisJob(AnalyzeTblStmt stmt, boolean proxy) + throws DdlException, AnalysisException, ExecutionException, InterruptedException { // Using auto analyzer if user specifies. if (stmt.getAnalyzeProperties().getProperties().containsKey("use.auto.analyzer")) { StatisticsAutoCollector autoCollector = Env.getCurrentEnv().getStatisticsAutoCollector(); - if (autoCollector.skip(stmt.getTable())) { - return; - } - List<AnalysisInfo> jobs = new ArrayList<>(); - autoCollector.createAnalyzeJobForTbl(stmt.getDb(), jobs, stmt.getTable()); - if (jobs.isEmpty()) { - return; - } - AnalysisInfo job = autoCollector.getNeedAnalyzeColumns(jobs.get(0)); - if (job != null) { - Env.getCurrentEnv().getStatisticsAutoCollector().createSystemAnalysisJob(job); - } + autoCollector.processOneJob(stmt.getTable(), JobPriority.MANUAL_AUTO); return; } AnalysisInfo jobInfo = buildAndAssignJob(stmt); @@ -347,7 +337,6 @@ public class AnalysisManager implements Writable { infoBuilder.setAnalysisMode(analysisMode); infoBuilder.setAnalysisMethod(analysisMethod); infoBuilder.setScheduleType(scheduleType); - infoBuilder.setLastExecTimeInMs(0); infoBuilder.setCronExpression(cronExpression); infoBuilder.setForceFull(stmt.forceFull()); infoBuilder.setUsingSqlForPartitionColumn(stmt.usingSqlForPartitionColumn()); @@ -377,6 +366,8 @@ public class AnalysisManager implements Writable { && analysisMethod.equals(AnalysisMethod.SAMPLE)); long rowCount = StatisticsUtil.isEmptyTable(table, analysisMethod) ? 0 : table.getRowCount(); infoBuilder.setRowCount(rowCount); + infoBuilder.setPriority(JobPriority.MANUAL); + infoBuilder.setTableVersion(table instanceof OlapTable ? ((OlapTable) table).getVisibleVersion() : 0); return infoBuilder.build(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java index 5277d8025fc..3db9a862d10 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java @@ -27,6 +27,7 @@ import org.apache.logging.log4j.Logger; import java.util.Comparator; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -88,9 +89,9 @@ public class AnalysisTaskExecutor { } } - public void submitTask(BaseAnalysisTask task) { + public Future<?> submitTask(BaseAnalysisTask task) { AnalysisTaskWrapper taskWrapper = new AnalysisTaskWrapper(this, task); - executors.submit(taskWrapper); + return executors.submit(taskWrapper); } public void putJob(AnalysisTaskWrapper wrapper) throws Exception { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColStatsMeta.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColStatsMeta.java index 445641b2505..f9da52b01cc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColStatsMeta.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColStatsMeta.java @@ -43,13 +43,17 @@ public class ColStatsMeta { @SerializedName("trigger") public JobType jobType; + @SerializedName("tv") + public long tableVersion; + public ColStatsMeta(long updatedTime, AnalysisMethod analysisMethod, - AnalysisType analysisType, JobType jobType, long queriedTimes) { + AnalysisType analysisType, JobType jobType, long queriedTimes, long tableVersion) { this.updatedTime = updatedTime; this.analysisMethod = analysisMethod; this.analysisType = analysisType; this.jobType = jobType; this.queriedTimes.addAndGet(queriedTimes); + this.tableVersion = tableVersion; } public void clear() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/JobPriority.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/JobPriority.java new file mode 100644 index 00000000000..2d45dad877b --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/JobPriority.java @@ -0,0 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +public enum JobPriority { + HIGH, + LOW, + MANUAL, + MANUAL_AUTO; +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java index c1a8af93ac0..1d7818a8e8d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java @@ -18,172 +18,125 @@ package org.apache.doris.statistics; import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.Pair; -import org.apache.doris.common.util.TimeUtils; -import org.apache.doris.datasource.CatalogIf; -import org.apache.doris.datasource.hive.HMSExternalTable; +import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod; import org.apache.doris.statistics.AnalysisInfo.JobType; import org.apache.doris.statistics.AnalysisInfo.ScheduleType; import org.apache.doris.statistics.util.StatisticsUtil; -import org.apache.hudi.common.util.VisibleForTesting; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.time.LocalTime; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.StringJoiner; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -public class StatisticsAutoCollector extends StatisticsCollector { +public class StatisticsAutoCollector extends MasterDaemon { private static final Logger LOG = LogManager.getLogger(StatisticsAutoCollector.class); + public static final int JOB_QUEUE_LIMIT = 100; + private final BlockingQueue<TableIf> highPriorityJobs = new ArrayBlockingQueue<>(JOB_QUEUE_LIMIT); + private final BlockingQueue<TableIf> lowPriorityJobs = new ArrayBlockingQueue<>(JOB_QUEUE_LIMIT); + + protected final AnalysisTaskExecutor analysisTaskExecutor; + public StatisticsAutoCollector() { - super("Automatic Analyzer", - TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes), - new AnalysisTaskExecutor(Config.auto_analyze_simultaneously_running_task_num, - StatisticConstants.TASK_QUEUE_CAP)); + super("Automatic Analyzer", TimeUnit.SECONDS.toMillis(10)); + this.analysisTaskExecutor = new AnalysisTaskExecutor(Config.auto_analyze_simultaneously_running_task_num, + StatisticConstants.TASK_QUEUE_CAP); } @Override - protected void collect() { - if (canCollect()) { - analyzeAll(); + protected void runAfterCatalogReady() { + if (!Env.getCurrentEnv().isMaster()) { + return; + } + if (!StatisticsUtil.statsTblAvailable()) { + LOG.info("Stats table not available, skip"); + return; + } + if (Env.isCheckpointThread()) { + return; + } + try { + collect(); + } catch (DdlException | ExecutionException | InterruptedException e) { + LOG.warn("One auto analyze job failed. ", e); } } - protected boolean canCollect() { - return StatisticsUtil.enableAutoAnalyze() - && StatisticsUtil.inAnalyzeTime(LocalTime.now(TimeUtils.getTimeZone().toZoneId())); - } - - protected void analyzeAll() { - List<CatalogIf> catalogs = getCatalogsInOrder(); - for (CatalogIf ctl : catalogs) { - if (!canCollect()) { - analysisTaskExecutor.clear(); - break; - } - if (!ctl.enableAutoAnalyze()) { - continue; + protected void collect() throws DdlException, ExecutionException, InterruptedException { + if (!StatisticsUtil.canCollect()) { + return; + } + AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager(); + while (true) { + Pair<TableIf, JobPriority> jobPair = fetchOneJob(); + TableIf table = jobPair.first; + if (table == null) { + return; } - List<DatabaseIf> dbs = getDatabasesInOrder(ctl); - for (DatabaseIf<TableIf> databaseIf : dbs) { - if (!canCollect()) { - analysisTaskExecutor.clear(); - break; - } - if (StatisticConstants.SYSTEM_DBS.contains(databaseIf.getFullName())) { - continue; - } - try { - analyzeDb(databaseIf); - } catch (Throwable t) { - LOG.warn("Failed to analyze database {}.{}", ctl.getName(), databaseIf.getFullName(), t); - continue; - } + TableStatsMeta tblStats = analysisManager.findTableStatsStatus(table.getId()); + if (table.needReAnalyzeTable(tblStats) || StatisticsUtil.tableNotAnalyzedForTooLong(table, tblStats)) { + processOneJob(table, jobPair.second); } } } - public List<CatalogIf> getCatalogsInOrder() { - return Env.getCurrentEnv().getCatalogMgr().getCopyOfCatalog().stream() - .sorted((c1, c2) -> (int) (c1.getId() - c2.getId())).collect(Collectors.toList()); - } - - public List<DatabaseIf<? extends TableIf>> getDatabasesInOrder(CatalogIf<DatabaseIf> catalog) { - return catalog.getAllDbs().stream() - .sorted((d1, d2) -> (int) (d1.getId() - d2.getId())).collect(Collectors.toList()); - } - - public List<TableIf> getTablesInOrder(DatabaseIf<? extends TableIf> db) { - return db.getTables().stream() - .sorted((t1, t2) -> (int) (t1.getId() - t2.getId())).collect(Collectors.toList()); - } - - public void analyzeDb(DatabaseIf<TableIf> databaseIf) throws DdlException { - List<AnalysisInfo> analysisInfos = constructAnalysisInfo(databaseIf); - for (AnalysisInfo analysisInfo : analysisInfos) { - try { - if (!canCollect()) { - analysisTaskExecutor.clear(); - break; - } - analysisInfo = getNeedAnalyzeColumns(analysisInfo); - if (analysisInfo == null) { - continue; - } - createSystemAnalysisJob(analysisInfo); - } catch (Throwable t) { - analysisInfo.message = t.getMessage(); - LOG.warn("Failed to auto analyze table {}.{}, reason {}", - databaseIf.getFullName(), analysisInfo.tblId, analysisInfo.message, t); - continue; - } + protected Pair<TableIf, JobPriority> fetchOneJob() { + TableIf table = null; + JobPriority priority = null; + try { + table = highPriorityJobs.poll(1, TimeUnit.SECONDS); + priority = JobPriority.HIGH; + } catch (InterruptedException e) { + LOG.debug(e); } - } - - protected List<AnalysisInfo> constructAnalysisInfo(DatabaseIf<? extends TableIf> db) { - List<AnalysisInfo> analysisInfos = new ArrayList<>(); - for (TableIf table : getTablesInOrder(db)) { + if (table == null) { try { - if (skip(table)) { - continue; - } - createAnalyzeJobForTbl(db, analysisInfos, table); - } catch (Throwable t) { - LOG.warn("Failed to analyze table {}.{}.{}", - db.getCatalog().getName(), db.getFullName(), table.getName(), t); + table = lowPriorityJobs.poll(1, TimeUnit.SECONDS); + priority = JobPriority.LOW; + } catch (InterruptedException e) { + LOG.debug(e); } } - return analysisInfos; - } - - // return true if skip auto analyze this time. - protected boolean skip(TableIf table) { - if (!(table instanceof OlapTable || table instanceof HMSExternalTable)) { - return true; - } - // For now, only support Hive HMS table auto collection. - if (table instanceof HMSExternalTable - && !((HMSExternalTable) table).getDlaType().equals(HMSExternalTable.DLAType.HIVE)) { - return true; - } - if (table.getDataSize(true) < StatisticsUtil.getHugeTableLowerBoundSizeInBytes() * 5) { - return false; - } - TableStatsMeta tableStats = Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(table.getId()); - // means it's never got analyzed or new partition loaded data. - if (tableStats == null || tableStats.newPartitionLoaded.get()) { - return false; + if (table == null) { + LOG.debug("Job queues are all empty."); } - if (tableStats.userInjected) { - return true; - } - return System.currentTimeMillis() - - tableStats.updatedTime < StatisticsUtil.getHugeTableAutoAnalyzeIntervalInMillis(); + return Pair.of(table, priority); } - protected void createAnalyzeJobForTbl(DatabaseIf<? extends TableIf> db, - List<AnalysisInfo> analysisInfos, TableIf table) { - AnalysisMethod analysisMethod = table.getDataSize(true) >= StatisticsUtil.getHugeTableLowerBoundSizeInBytes() - ? AnalysisMethod.SAMPLE : AnalysisMethod.FULL; - if (table instanceof OlapTable && analysisMethod.equals(AnalysisMethod.SAMPLE)) { - OlapTable ot = (OlapTable) table; - if (ot.getRowCountForIndex(ot.getBaseIndexId(), true) == TableIf.UNKNOWN_ROW_COUNT) { - LOG.info("Table {} row count is not fully reported, skip auto analyzing this time.", ot.getName()); - return; - } + protected void processOneJob(TableIf table, JobPriority priority) + throws DdlException, ExecutionException, InterruptedException { + List<Pair<String, String>> needRunColumns = table.getColumnIndexPairs( + table.getSchemaAllIndexes(false) + .stream() + .filter(c -> !StatisticsUtil.isUnsupportedType(c.getType())) + .map(Column::getName) + .collect(Collectors.toSet())); + if (needRunColumns == null || needRunColumns.isEmpty()) { + return; + } + AnalysisMethod analysisMethod = + table.getDataSize(true) >= StatisticsUtil.getHugeTableLowerBoundSizeInBytes() + ? AnalysisMethod.SAMPLE : AnalysisMethod.FULL; + if (!tableRowCountReported(table, analysisMethod)) { + return; } // We don't auto analyze empty table to avoid all 0 stats. // Because all 0 is more dangerous than unknown stats when row count report is delayed. @@ -198,12 +151,33 @@ public class StatisticsAutoCollector extends StatisticsCollector { } return; } + StringJoiner stringJoiner = new StringJoiner(",", "[", "]"); + for (Pair<String, String> pair : needRunColumns) { + stringJoiner.add(pair.toString()); + } + AnalysisInfo jobInfo = createAnalysisInfo(table, analysisMethod, rowCount, + stringJoiner.toString(), needRunColumns, priority); + executeSystemAnalysisJob(jobInfo); + } + + protected boolean tableRowCountReported(TableIf table, AnalysisMethod analysisMethod) { + if (table instanceof OlapTable && analysisMethod.equals(AnalysisMethod.SAMPLE)) { + OlapTable ot = (OlapTable) table; + if (ot.getRowCountForIndex(ot.getBaseIndexId(), true) == TableIf.UNKNOWN_ROW_COUNT) { + LOG.info("Table {} row count is not fully reported, skip auto analyzing this time.", ot.getName()); + return false; + } + } + return true; + } + + protected AnalysisInfo createAnalysisInfo(TableIf table, AnalysisMethod analysisMethod, long rowCount, + String colNames, List<Pair<String, String>> needRunColumns, JobPriority priority) { AnalysisInfo jobInfo = new AnalysisInfoBuilder() .setJobId(Env.getCurrentEnv().getNextId()) - .setCatalogId(db.getCatalog().getId()) - .setDBId(db.getId()) + .setCatalogId(table.getDatabase().getCatalog().getId()) + .setDBId(table.getDatabase().getId()) .setTblId(table.getId()) - .setColName(null) .setAnalysisType(AnalysisInfo.AnalysisType.FUNDAMENTALS) .setAnalysisMode(AnalysisInfo.AnalysisMode.INCREMENTAL) .setAnalysisMethod(analysisMethod) @@ -216,37 +190,46 @@ public class StatisticsAutoCollector extends StatisticsCollector { .setJobType(JobType.SYSTEM) .setTblUpdateTime(table.getUpdateTime()) .setEmptyJob(table instanceof OlapTable && table.getRowCount() == 0 - && analysisMethod.equals(AnalysisMethod.SAMPLE)) + && analysisMethod.equals(AnalysisMethod.SAMPLE)) .setRowCount(rowCount) + .setColName(colNames) + .setJobColumns(needRunColumns) + .setPriority(priority) + .setTableVersion(table instanceof OlapTable ? ((OlapTable) table).getVisibleVersion() : 0) .build(); - analysisInfos.add(jobInfo); + return jobInfo; } - @VisibleForTesting - protected AnalysisInfo getNeedAnalyzeColumns(AnalysisInfo jobInfo) { - TableIf table = StatisticsUtil.findTable(jobInfo.catalogId, jobInfo.dbId, jobInfo.tblId); - // Skip tables that are too wide. - if (table.getBaseSchema().size() > StatisticsUtil.getAutoAnalyzeTableWidthThreshold()) { - return null; + // Analysis job created by the system + protected void executeSystemAnalysisJob(AnalysisInfo jobInfo) + throws DdlException, ExecutionException, InterruptedException { + if (jobInfo.jobColumns.isEmpty()) { + // No statistics need to be collected or updated + return; } - - AnalysisManager analysisManager = Env.getServingEnv().getAnalysisManager(); - TableStatsMeta tblStats = analysisManager.findTableStatsStatus(table.getId()); - - List<Pair<String, String>> needRunColumns = null; - if (table.needReAnalyzeTable(tblStats)) { - needRunColumns = table.getColumnIndexPairs(table.getSchemaAllIndexes(false) - .stream().map(Column::getName).collect(Collectors.toSet())); + Map<Long, BaseAnalysisTask> analysisTasks = new HashMap<>(); + AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager(); + analysisManager.createTaskForEachColumns(jobInfo, analysisTasks, false); + if (StatisticsUtil.isExternalTable(jobInfo.catalogId, jobInfo.dbId, jobInfo.tblId)) { + analysisManager.createTableLevelTaskForExternalTable(jobInfo, analysisTasks, false); } - - if (needRunColumns == null || needRunColumns.isEmpty()) { - return null; + Env.getCurrentEnv().getAnalysisManager().constructJob(jobInfo, analysisTasks.values()); + Env.getCurrentEnv().getAnalysisManager().registerSysJob(jobInfo, analysisTasks); + Future<?>[] futures = new Future[analysisTasks.values().size()]; + int i = 0; + for (BaseAnalysisTask task : analysisTasks.values()) { + futures[i++] = analysisTaskExecutor.submitTask(task); } - StringJoiner stringJoiner = new StringJoiner(",", "[", "]"); - for (Pair<String, String> pair : needRunColumns) { - stringJoiner.add(pair.toString()); + for (Future future : futures) { + future.get(); } - return new AnalysisInfoBuilder(jobInfo) - .setColName(stringJoiner.toString()).setJobColumns(needRunColumns).build(); + } + + public void appendToHighPriorityJobs(TableIf table) throws InterruptedException { + highPriorityJobs.put(table); + } + + public boolean appendToLowPriorityJobs(TableIf table) { + return lowPriorityJobs.offer(table); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java deleted file mode 100644 index ec187fe893a..00000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java +++ /dev/null @@ -1,79 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.statistics; - -import org.apache.doris.catalog.Env; -import org.apache.doris.common.DdlException; -import org.apache.doris.common.util.MasterDaemon; -import org.apache.doris.statistics.util.StatisticsUtil; - -import org.apache.hudi.common.util.VisibleForTesting; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.util.HashMap; -import java.util.Map; - -public abstract class StatisticsCollector extends MasterDaemon { - - private static final Logger LOG = LogManager.getLogger(StatisticsCollector.class); - - protected final AnalysisTaskExecutor analysisTaskExecutor; - - public StatisticsCollector(String name, long intervalMs, AnalysisTaskExecutor analysisTaskExecutor) { - super(name, intervalMs); - this.analysisTaskExecutor = analysisTaskExecutor; - } - - @Override - protected void runAfterCatalogReady() { - if (!Env.getCurrentEnv().isMaster()) { - return; - } - if (!StatisticsUtil.statsTblAvailable()) { - LOG.info("Stats table not available, skip"); - return; - } - if (Env.isCheckpointThread()) { - return; - } - collect(); - } - - protected abstract void collect(); - - // Analysis job created by the system - @VisibleForTesting - protected void createSystemAnalysisJob(AnalysisInfo jobInfo) - throws DdlException { - if (jobInfo.jobColumns.isEmpty()) { - // No statistics need to be collected or updated - return; - } - Map<Long, BaseAnalysisTask> analysisTasks = new HashMap<>(); - AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager(); - analysisManager.createTaskForEachColumns(jobInfo, analysisTasks, false); - if (StatisticsUtil.isExternalTable(jobInfo.catalogId, jobInfo.dbId, jobInfo.tblId)) { - analysisManager.createTableLevelTaskForExternalTable(jobInfo, analysisTasks, false); - } - Env.getCurrentEnv().getAnalysisManager().constructJob(jobInfo, analysisTasks.values()); - Env.getCurrentEnv().getAnalysisManager().registerSysJob(jobInfo, analysisTasks); - analysisTasks.values().forEach(analysisTaskExecutor::submitTask); - } - -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java new file mode 100644 index 00000000000..3703cf236c0 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java @@ -0,0 +1,152 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import org.apache.doris.catalog.DatabaseIf; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.Config; +import org.apache.doris.common.util.MasterDaemon; +import org.apache.doris.datasource.CatalogIf; +import org.apache.doris.datasource.hive.HMSExternalTable; +import org.apache.doris.statistics.util.StatisticsUtil; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +public class StatisticsJobAppender extends MasterDaemon { + private static final Logger LOG = LogManager.getLogger(StatisticsJobAppender.class); + + public StatisticsJobAppender(String name) { + super(name, TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes)); + } + + @Override + protected void runAfterCatalogReady() { + if (!Env.getCurrentEnv().isMaster()) { + return; + } + if (!StatisticsUtil.statsTblAvailable()) { + LOG.info("Stats table not available, skip"); + return; + } + if (Env.getCurrentEnv().getStatisticsAutoCollector() == null) { + LOG.info("Statistics auto collector not ready, skip"); + return; + } + if (Env.isCheckpointThread()) { + return; + } + if (!Env.getCurrentEnv().isReady()) { + return; + } + if (!StatisticsUtil.canCollect()) { + LOG.debug("Auto analyze not enabled or not in analyze time range."); + return; + } + traverseAllTables(); + } + + protected void traverseAllTables() { + List<CatalogIf> catalogs = getCatalogsInOrder(); + AnalysisManager analysisManager = Env.getServingEnv().getAnalysisManager(); + StatisticsAutoCollector autoCollector = Env.getCurrentEnv().getStatisticsAutoCollector(); + for (CatalogIf ctl : catalogs) { + if (!StatisticsUtil.canCollect()) { + break; + } + if (!ctl.enableAutoAnalyze()) { + continue; + } + List<DatabaseIf> dbs = getDatabasesInOrder(ctl); + for (DatabaseIf<TableIf> db : dbs) { + if (!StatisticsUtil.canCollect()) { + break; + } + if (StatisticConstants.SYSTEM_DBS.contains(db.getFullName())) { + continue; + } + for (TableIf table : getTablesInOrder(db)) { + try { + if (skip(table)) { + continue; + } + TableStatsMeta tblStats = analysisManager.findTableStatsStatus(table.getId()); + if (table.needReAnalyzeTable(tblStats)) { + autoCollector.appendToHighPriorityJobs(table); + } else if (StatisticsUtil.tableNotAnalyzedForTooLong(table, tblStats)) { + autoCollector.appendToLowPriorityJobs(table); + } + } catch (Throwable t) { + LOG.warn("Failed to analyze table {}.{}.{}", + ctl.getName(), db.getFullName(), table.getName(), t); + } + } + } + } + } + + public List<CatalogIf> getCatalogsInOrder() { + return Env.getCurrentEnv().getCatalogMgr().getCopyOfCatalog().stream() + .sorted((c1, c2) -> (int) (c1.getId() - c2.getId())).collect(Collectors.toList()); + } + + public List<DatabaseIf<? extends TableIf>> getDatabasesInOrder(CatalogIf<DatabaseIf> catalog) { + return catalog.getAllDbs().stream() + .sorted((d1, d2) -> (int) (d1.getId() - d2.getId())).collect(Collectors.toList()); + } + + public List<TableIf> getTablesInOrder(DatabaseIf<? extends TableIf> db) { + return db.getTables().stream() + .sorted((t1, t2) -> (int) (t1.getId() - t2.getId())).collect(Collectors.toList()); + } + + // return true if skip auto analyze this time. + protected boolean skip(TableIf table) { + if (!(table instanceof OlapTable || table instanceof HMSExternalTable)) { + return true; + } + // For now, only support Hive HMS table auto collection. + if (table instanceof HMSExternalTable + && !((HMSExternalTable) table).getDlaType().equals(HMSExternalTable.DLAType.HIVE)) { + return true; + } + // Skip wide table. + if (table.getBaseSchema().size() > StatisticsUtil.getAutoAnalyzeTableWidthThreshold()) { + return true; + } + if (table.getDataSize(true) < StatisticsUtil.getHugeTableLowerBoundSizeInBytes() * 5) { + return false; + } + TableStatsMeta tableStats = Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(table.getId()); + // means it's never got analyzed or new partition loaded data. + if (tableStats == null || tableStats.newPartitionLoaded.get()) { + return false; + } + if (tableStats.userInjected) { + return true; + } + return System.currentTimeMillis() + - tableStats.lastAnalyzeTime < StatisticsUtil.getHugeTableAutoAnalyzeIntervalInMillis(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java index 50739e98aea..33f099e2b0b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java @@ -79,6 +79,9 @@ public class TableStatsMeta implements Writable, GsonPostProcessable { @SerializedName("updateTime") public long updatedTime; + @SerializedName("lat") + public long lastAnalyzeTime; + @SerializedName("colNameToColStatsMeta") private ConcurrentMap<String, ColStatsMeta> deprecatedColNameToColStatsMeta = new ConcurrentHashMap<>(); @@ -156,19 +159,21 @@ public class TableStatsMeta implements Writable, GsonPostProcessable { public void update(AnalysisInfo analyzedJob, TableIf tableIf) { updatedTime = analyzedJob.tblUpdateTime; + lastAnalyzeTime = analyzedJob.createTime; if (analyzedJob.userInject) { userInjected = true; } for (Pair<String, String> colPair : analyzedJob.jobColumns) { ColStatsMeta colStatsMeta = colToColStatsMeta.get(colPair); if (colStatsMeta == null) { - colToColStatsMeta.put(colPair, new ColStatsMeta(updatedTime, - analyzedJob.analysisMethod, analyzedJob.analysisType, analyzedJob.jobType, 0)); + colToColStatsMeta.put(colPair, new ColStatsMeta(lastAnalyzeTime, analyzedJob.analysisMethod, + analyzedJob.analysisType, analyzedJob.jobType, 0, analyzedJob.tableVersion)); } else { - colStatsMeta.updatedTime = updatedTime; + colStatsMeta.updatedTime = lastAnalyzeTime; colStatsMeta.analysisType = analyzedJob.analysisType; colStatsMeta.analysisMethod = analyzedJob.analysisMethod; colStatsMeta.jobType = analyzedJob.jobType; + colStatsMeta.tableVersion = analyzedJob.tableVersion; } } jobType = analyzedJob.jobType; @@ -233,4 +238,9 @@ public class TableStatsMeta implements Writable, GsonPostProcessable { public boolean isColumnsStatsEmpty() { return colToColStatsMeta == null || colToColStatsMeta.isEmpty(); } + + @VisibleForTesting + public void setColToColStatsMeta(ConcurrentMap<Pair<String, String>, ColStatsMeta> colToColStatsMeta) { + this.colToColStatsMeta = colToColStatsMeta; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java index 288eb88e95f..718260a25b0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java @@ -51,6 +51,7 @@ import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.CatalogIf; import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.InternalCatalog; @@ -70,6 +71,7 @@ import org.apache.doris.statistics.ColumnStatisticBuilder; import org.apache.doris.statistics.Histogram; import org.apache.doris.statistics.ResultRow; import org.apache.doris.statistics.StatisticConstants; +import org.apache.doris.statistics.TableStatsMeta; import org.apache.doris.system.Frontend; import com.google.common.base.Preconditions; @@ -904,4 +906,37 @@ public class StatisticsUtil { return rowCount == 0; } + public static boolean canCollect() { + return enableAutoAnalyze() && inAnalyzeTime(LocalTime.now(TimeUtils.getTimeZone().toZoneId())); + } + + public static boolean tableNotAnalyzedForTooLong(TableIf table, TableStatsMeta tblStats) { + if (table == null || tblStats == null) { + LOG.warn("Table or stats is null."); + return false; + } + if (tblStats.userInjected) { + return false; + } + if (!(table instanceof OlapTable)) { + return false; + } + boolean isLongTime = Config.auto_analyze_interval_seconds > 0 + && System.currentTimeMillis() - tblStats.lastAnalyzeTime > Config.auto_analyze_interval_seconds * 1000; + if (!isLongTime) { + return false; + } + // For OlapTable, if update rows is 0, row count doesn't change since last analyze + // and table visible version doesn't change since last analyze. Then we skip analyzing it. + if (tblStats.updatedRows.get() != 0) { + return true; + } + long rowCount = table.getRowCount(); + if (rowCount != tblStats.rowCount) { + return true; + } + long visibleVersion = ((OlapTable) table).getVisibleVersion(); + return tblStats.analyzeColumns().stream() + .anyMatch(c -> tblStats.findColumnStatsMeta(c.first, c.second).tableVersion != visibleVersion); + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java index 4f40071f501..d687d111d2c 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java @@ -17,504 +17,102 @@ package org.apache.doris.statistics; -import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.Database; -import org.apache.doris.catalog.DatabaseIf; -import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.EnvFactory; import org.apache.doris.catalog.OlapTable; -import org.apache.doris.catalog.PrimitiveType; -import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableIf; -import org.apache.doris.catalog.Type; -import org.apache.doris.catalog.View; -import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; -import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; -import org.apache.doris.datasource.CatalogIf; -import org.apache.doris.datasource.InternalCatalog; -import org.apache.doris.statistics.util.StatisticsUtil; +import org.apache.doris.datasource.ExternalTable; +import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod; -import com.google.common.collect.Lists; -import mockit.Expectations; -import mockit.Injectable; import mockit.Mock; import mockit.MockUp; -import mockit.Mocked; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import java.time.LocalTime; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.ExecutionException; public class StatisticsAutoCollectorTest { @Test - public void testAnalyzeAll(@Injectable AnalysisInfo analysisInfo) { - new MockUp<CatalogIf>() { - @Mock - public Collection<DatabaseIf> getAllDbs() { - Database db1 = new Database(1, FeConstants.INTERNAL_DB_NAME); - Database db2 = new Database(2, "anyDB"); - List<DatabaseIf> databaseIfs = new ArrayList<>(); - databaseIfs.add(db1); - databaseIfs.add(db2); - return databaseIfs; - } - }; + public void testCollect() throws DdlException, ExecutionException, InterruptedException { + StatisticsAutoCollector collector = new StatisticsAutoCollector(); + final int[] count = {0, 0}; new MockUp<StatisticsAutoCollector>() { @Mock - public List<AnalysisInfo> constructAnalysisInfo(DatabaseIf<TableIf> db) { - return Arrays.asList(analysisInfo, analysisInfo); - } - - int count = 0; - - @Mock - public AnalysisInfo getReAnalyzeRequiredPart(AnalysisInfo jobInfo) { - return count++ == 0 ? null : jobInfo; - } - - @Mock - public void createSystemAnalysisJob(AnalysisInfo jobInfo) - throws DdlException { - - } - }; - - StatisticsAutoCollector saa = new StatisticsAutoCollector(); - saa.runAfterCatalogReady(); - new Expectations() { - { - try { - saa.createSystemAnalysisJob((AnalysisInfo) any); - times = 1; - } catch (Exception e) { - throw new RuntimeException(e); - } - } - }; - } - - @Test - public void testConstructAnalysisInfo( - @Injectable OlapTable o2, @Injectable View v) { - new MockUp<Database>() { - @Mock - public List<Table> getTables() { - List<Table> tableIfs = new ArrayList<>(); - tableIfs.add(o2); - tableIfs.add(v); - return tableIfs; - } - - @Mock - public String getFullName() { - return "anyDb"; - } - }; - - new MockUp<OlapTable>() { - @Mock - public String getName() { - return "anytable"; - } - - @Mock - public List<Column> getSchemaAllIndexes(boolean full) { - List<Column> columns = new ArrayList<>(); - columns.add(new Column("c1", PrimitiveType.INT)); - columns.add(new Column("c2", PrimitiveType.HLL)); - return columns; - } - - @Mock - public long getRowCount() { - return 1; - } - }; - StatisticsAutoCollector saa = new StatisticsAutoCollector(); - List<AnalysisInfo> analysisInfoList = saa.constructAnalysisInfo(new Database(1, "anydb")); - Assertions.assertEquals(1, analysisInfoList.size()); - Assertions.assertNull(analysisInfoList.get(0).colName); - } - - @Test - public void testSkipWideTable() { - - TableIf tableIf = new OlapTable(); - - new MockUp<OlapTable>() { - @Mock - public List<Column> getBaseSchema() { - return Lists.newArrayList(new Column("col1", Type.INT), new Column("col2", Type.INT)); - } - - @Mock - public List<Pair<String, String>> getColumnIndexPairs(Set<String> columns) { - ArrayList<Pair<String, String>> list = Lists.newArrayList(); - list.add(Pair.of("1", "1")); - return list; + protected Pair<TableIf, JobPriority> fetchOneJob() { + count[0]++; + return Pair.of(null, JobPriority.LOW); } }; - new MockUp<StatisticsUtil>() { - int count = 0; - int[] thresholds = {1, 10}; - - @Mock - public TableIf findTable(long catalogName, long dbName, long tblName) { - return tableIf; - } - - @Mock - public int getAutoAnalyzeTableWidthThreshold() { - return thresholds[count++]; - } - }; - - AnalysisInfo analysisInfo = new AnalysisInfoBuilder().build(); - StatisticsAutoCollector statisticsAutoCollector = new StatisticsAutoCollector(); - Assertions.assertNull(statisticsAutoCollector.getNeedAnalyzeColumns(analysisInfo)); - Assertions.assertNotNull(statisticsAutoCollector.getNeedAnalyzeColumns(analysisInfo)); - } - - @Test - public void testLoop() { - AtomicBoolean timeChecked = new AtomicBoolean(); - AtomicBoolean switchChecked = new AtomicBoolean(); - new MockUp<StatisticsUtil>() { - - @Mock - public boolean inAnalyzeTime(LocalTime now) { - timeChecked.set(true); - return true; - } - - @Mock - public boolean enableAutoAnalyze() { - switchChecked.set(true); - return true; - } - }; - StatisticsAutoCollector autoCollector = new StatisticsAutoCollector(); - autoCollector.collect(); - Assertions.assertTrue(timeChecked.get() && switchChecked.get()); - - } - - @Test - public void checkAvailableThread() { - StatisticsAutoCollector autoCollector = new StatisticsAutoCollector(); - Assertions.assertEquals(Config.auto_analyze_simultaneously_running_task_num, - autoCollector.analysisTaskExecutor.executors.getMaximumPoolSize()); - } - - @Test - public void testSkip(@Mocked OlapTable olapTable, @Mocked TableStatsMeta stats, @Mocked TableIf anyOtherTable) { - new MockUp<OlapTable>() { - - @Mock - public long getDataSize(boolean singleReplica) { - return StatisticsUtil.getHugeTableLowerBoundSizeInBytes() * 5 + 1000000000; - } - }; - - new MockUp<AnalysisManager>() { - - @Mock - public TableStatsMeta findTableStatsStatus(long tblId) { - return stats; - } - }; - // A very huge table has been updated recently, so we should skip it this time - stats.updatedTime = System.currentTimeMillis() - 1000; - stats.newPartitionLoaded = new AtomicBoolean(); - stats.newPartitionLoaded.set(true); - StatisticsAutoCollector autoCollector = new StatisticsAutoCollector(); - // Test new partition loaded data for the first time. Not skip. - Assertions.assertFalse(autoCollector.skip(olapTable)); - stats.newPartitionLoaded.set(false); - // Assertions.assertTrue(autoCollector.skip(olapTable)); - // The update of this huge table is long time ago, so we shouldn't skip it this time - stats.updatedTime = System.currentTimeMillis() - - StatisticsUtil.getHugeTableAutoAnalyzeIntervalInMillis() - 10000; - Assertions.assertFalse(autoCollector.skip(olapTable)); new MockUp<AnalysisManager>() { - @Mock public TableStatsMeta findTableStatsStatus(long tblId) { + count[1]++; return null; } }; - // can't find table stats meta, which means this table never get analyzed, so we shouldn't skip it this time - Assertions.assertFalse(autoCollector.skip(olapTable)); - new MockUp<AnalysisManager>() { - - @Mock - public TableStatsMeta findTableStatsStatus(long tblId) { - return stats; - } - }; - stats.userInjected = true; - Assertions.assertTrue(autoCollector.skip(olapTable)); - // this is not olap table nor external table, so we should skip it this time - Assertions.assertTrue(autoCollector.skip(anyOtherTable)); - } - - // For small table, use full - @Test - public void testCreateAnalyzeJobForTbl1( - @Injectable OlapTable t1, - @Injectable Database db - ) throws Exception { - new MockUp<Database>() { - - @Mock - public CatalogIf getCatalog() { - return Env.getCurrentInternalCatalog(); - } - - @Mock - public long getId() { - return 0; - } - }; - new MockUp<OlapTable>() { - - int count = 0; - - @Mock - public List<Column> getBaseSchema() { - return Lists.newArrayList(new Column("test", PrimitiveType.INT)); - } - - @Mock - public long getDataSize(boolean singleReplica) { - return StatisticsUtil.getHugeTableLowerBoundSizeInBytes() - 1; - } - - @Mock - public BaseAnalysisTask createAnalysisTask(AnalysisInfo info) { - return new OlapAnalysisTask(info); - } - - @Mock - public List<Long> getMvColumnIndexIds(String columnName) { - ArrayList<Long> objects = new ArrayList<>(); - objects.add(-1L); - return objects; - } - - @Mock - public long getRowCount() { - return 1; - } - }; - - new MockUp<StatisticsUtil>() { - @Mock - public TableIf findTable(long catalogId, long dbId, long tblId) { - return t1; - } - }; - - StatisticsAutoCollector sac = new StatisticsAutoCollector(); - List<AnalysisInfo> jobInfos = new ArrayList<>(); - sac.createAnalyzeJobForTbl(db, jobInfos, t1); - AnalysisInfo jobInfo = jobInfos.get(0); - List<Pair<String, String>> columnNames = Lists.newArrayList(); - columnNames.add(Pair.of("test", "t1")); - jobInfo = new AnalysisInfoBuilder(jobInfo).setJobColumns(columnNames).build(); - Map<Long, BaseAnalysisTask> analysisTasks = new HashMap<>(); - AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager(); - analysisManager.createTaskForEachColumns(jobInfo, analysisTasks, false); - Assertions.assertEquals(1, analysisTasks.size()); - for (BaseAnalysisTask task : analysisTasks.values()) { - Assertions.assertNull(task.getTableSample()); - } - } - - // for big table, use sample - @Test - public void testCreateAnalyzeJobForTbl2( - @Injectable OlapTable t1, - @Injectable Database db - ) throws Exception { - new MockUp<Database>() { - - @Mock - public CatalogIf getCatalog() { - return Env.getCurrentInternalCatalog(); - } - - @Mock - public long getId() { - return 0; - } - }; - new MockUp<OlapTable>() { - - int count = 0; - - @Mock - public List<Column> getBaseSchema() { - return Lists.newArrayList(new Column("test", PrimitiveType.INT)); - } - - @Mock - public long getDataSize(boolean singleReplica) { - return StatisticsUtil.getHugeTableLowerBoundSizeInBytes() * 2; - } - - @Mock - public BaseAnalysisTask createAnalysisTask(AnalysisInfo info) { - return new OlapAnalysisTask(info); - } - - @Mock - public List<Long> getMvColumnIndexIds(String columnName) { - ArrayList<Long> objects = new ArrayList<>(); - objects.add(-1L); - return objects; - } - - @Mock - public long getRowCount() { - return 1; - } - }; - - new MockUp<StatisticsUtil>() { - @Mock - public TableIf findTable(long catalogId, long dbId, long tblId) { - return t1; - } - }; - - StatisticsAutoCollector sac = new StatisticsAutoCollector(); - List<AnalysisInfo> jobInfos = new ArrayList<>(); - sac.createAnalyzeJobForTbl(db, jobInfos, t1); - AnalysisInfo jobInfo = jobInfos.get(0); - List<Pair<String, String>> colNames = Lists.newArrayList(); - colNames.add(Pair.of("test", "1")); - jobInfo = new AnalysisInfoBuilder(jobInfo).setJobColumns(colNames).build(); - Map<Long, BaseAnalysisTask> analysisTasks = new HashMap<>(); - AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager(); - analysisManager.createTaskForEachColumns(jobInfo, analysisTasks, false); - Assertions.assertEquals(1, analysisTasks.size()); - for (BaseAnalysisTask task : analysisTasks.values()) { - Assertions.assertNotNull(task.getTableSample()); - } - } - - @Test - public void testDisableAuto1() throws Exception { - InternalCatalog catalog1 = EnvFactory.createInternalCatalog(); - List<CatalogIf> catalogs = Lists.newArrayList(); - catalogs.add(catalog1); + collector.collect(); + Assertions.assertEquals(1, count[0]); + Assertions.assertEquals(0, count[1]); + OlapTable table = new OlapTable(); new MockUp<StatisticsAutoCollector>() { @Mock - public List<CatalogIf> getCatalogsInOrder() { - return catalogs; - } - - @Mock - protected boolean canCollect() { - return false; + protected Pair<TableIf, JobPriority> fetchOneJob() { + if (count[0] == 0) { + count[0]++; + return Pair.of(table, JobPriority.LOW); + } + count[0]++; + return Pair.of(null, JobPriority.LOW); } - }; - - StatisticsAutoCollector sac = new StatisticsAutoCollector(); - new Expectations(catalog1) {{ - catalog1.enableAutoAnalyze(); - times = 0; - }}; - - sac.analyzeAll(); + count[0] = 0; + count[1] = 0; + collector.collect(); + Assertions.assertEquals(2, count[0]); + Assertions.assertEquals(1, count[1]); } @Test - public void testDisableAuto2() throws Exception { - InternalCatalog catalog1 = EnvFactory.createInternalCatalog(); - List<CatalogIf> catalogs = Lists.newArrayList(); - catalogs.add(catalog1); - - Database db1 = new Database(); - List<DatabaseIf<? extends TableIf>> dbs = Lists.newArrayList(); - dbs.add(db1); - - new MockUp<StatisticsAutoCollector>() { - int count = 0; - boolean[] canCollectReturn = {true, false}; - @Mock - public List<CatalogIf> getCatalogsInOrder() { - return catalogs; - } - - @Mock - public List<DatabaseIf<? extends TableIf>> getDatabasesInOrder(CatalogIf<DatabaseIf> catalog) { - return dbs; - } - - @Mock - protected boolean canCollect() { - return canCollectReturn[count++]; - } - - }; - - StatisticsAutoCollector sac = new StatisticsAutoCollector(); - new Expectations(catalog1, db1) {{ - catalog1.enableAutoAnalyze(); - result = true; - times = 1; - db1.getFullName(); - times = 0; - }}; - - sac.analyzeAll(); + public void testFetchOneJob() throws InterruptedException { + OlapTable table1 = new OlapTable(); + OlapTable table2 = new OlapTable(); + StatisticsAutoCollector collector = new StatisticsAutoCollector(); + collector.appendToHighPriorityJobs(table1); + collector.appendToLowPriorityJobs(table2); + Pair<TableIf, JobPriority> jobPair = collector.fetchOneJob(); + Assertions.assertSame(table1, jobPair.first); + Assertions.assertEquals(JobPriority.HIGH, jobPair.second); + jobPair = collector.fetchOneJob(); + Assertions.assertSame(table2, jobPair.first); + Assertions.assertEquals(JobPriority.LOW, jobPair.second); + jobPair = collector.fetchOneJob(); + Assertions.assertNull(jobPair.first); } @Test - public void testCreateAnalyzeJobForTbl() { + public void testTableRowCountReported() { StatisticsAutoCollector collector = new StatisticsAutoCollector(); - OlapTable table = new OlapTable(); + ExternalTable externalTable = new ExternalTable(); + Assertions.assertTrue(collector.tableRowCountReported(externalTable, AnalysisMethod.SAMPLE)); + OlapTable olapTable = new OlapTable(); + Assertions.assertTrue(collector.tableRowCountReported(olapTable, AnalysisMethod.FULL)); + Assertions.assertTrue(collector.tableRowCountReported(externalTable, AnalysisMethod.FULL)); new MockUp<OlapTable>() { - @Mock - public long getDataSize(boolean singleReplica) { - return 100; - } - @Mock public long getRowCountForIndex(long indexId, boolean strict) { - return -1; - } - - @Mock - public boolean isPartitionedTable() { - return false; + return TableIf.UNKNOWN_ROW_COUNT; } }; - List<AnalysisInfo> infos = Lists.newArrayList(); - collector.createAnalyzeJobForTbl(null, infos, table); - Assertions.assertEquals(0, infos.size()); + Assertions.assertFalse(collector.tableRowCountReported(olapTable, AnalysisMethod.SAMPLE)); new MockUp<OlapTable>() { @Mock public long getRowCountForIndex(long indexId, boolean strict) { - return 100; + return TableIf.UNKNOWN_ROW_COUNT + 1; } }; - Assertions.assertThrows(NullPointerException.class, () -> collector.createAnalyzeJobForTbl(null, infos, table)); + Assertions.assertTrue(collector.tableRowCountReported(olapTable, AnalysisMethod.SAMPLE)); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsJobAppenderTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsJobAppenderTest.java new file mode 100644 index 00000000000..f57791a1a5c --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsJobAppenderTest.java @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.statistics.util.StatisticsUtil; + +import mockit.Mock; +import mockit.MockUp; +import mockit.Mocked; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.atomic.AtomicBoolean; + +public class StatisticsJobAppenderTest { + @Test + public void testSkip(@Mocked OlapTable olapTable, @Mocked TableStatsMeta stats, @Mocked TableIf anyOtherTable) { + new MockUp<OlapTable>() { + + @Mock + public long getDataSize(boolean singleReplica) { + return StatisticsUtil.getHugeTableLowerBoundSizeInBytes() * 5 + 1000000000; + } + }; + + new MockUp<AnalysisManager>() { + + @Mock + public TableStatsMeta findTableStatsStatus(long tblId) { + return stats; + } + }; + // A very huge table has been updated recently, so we should skip it this time + stats.updatedTime = System.currentTimeMillis() - 1000; + stats.newPartitionLoaded = new AtomicBoolean(); + stats.newPartitionLoaded.set(true); + StatisticsJobAppender appender = new StatisticsJobAppender("appender"); + // Test new partition loaded data for the first time. Not skip. + Assertions.assertFalse(appender.skip(olapTable)); + stats.newPartitionLoaded.set(false); + // The update of this huge table is long time ago, so we shouldn't skip it this time + stats.updatedTime = System.currentTimeMillis() + - StatisticsUtil.getHugeTableAutoAnalyzeIntervalInMillis() - 10000; + Assertions.assertFalse(appender.skip(olapTable)); + new MockUp<AnalysisManager>() { + + @Mock + public TableStatsMeta findTableStatsStatus(long tblId) { + return null; + } + }; + // can't find table stats meta, which means this table never get analyzed, so we shouldn't skip it this time + Assertions.assertFalse(appender.skip(olapTable)); + new MockUp<AnalysisManager>() { + + @Mock + public TableStatsMeta findTableStatsStatus(long tblId) { + return stats; + } + }; + stats.userInjected = true; + Assertions.assertTrue(appender.skip(olapTable)); + + // this is not olap table nor external table, so we should skip it this time + Assertions.assertTrue(appender.skip(anyOtherTable)); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/util/StatisticsUtilTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/util/StatisticsUtilTest.java index 724e0363833..0e221673e9d 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/util/StatisticsUtilTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/util/StatisticsUtilTest.java @@ -17,10 +17,20 @@ package org.apache.doris.statistics.util; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Table; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; +import org.apache.doris.common.Pair; +import org.apache.doris.datasource.ExternalTable; import org.apache.doris.qe.SessionVariable; +import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod; +import org.apache.doris.statistics.AnalysisInfo.AnalysisType; +import org.apache.doris.statistics.AnalysisInfo.JobType; +import org.apache.doris.statistics.ColStatsMeta; import org.apache.doris.statistics.ResultRow; +import org.apache.doris.statistics.TableStatsMeta; import com.google.common.collect.Lists; import mockit.Mock; @@ -33,6 +43,8 @@ import java.time.LocalTime; import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Base64; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; class StatisticsUtilTest { @Test @@ -150,4 +162,77 @@ class StatisticsUtilTest { // \\''"" Assertions.assertEquals("\\\\''\"", StatisticsUtil.escapeSQL(origin)); } + + @Test + public void testTableNotAnalyzedForTooLong() throws InterruptedException { + TableStatsMeta tableMeta = new TableStatsMeta(); + OlapTable olapTable = new OlapTable(); + ExternalTable externalTable = new ExternalTable(); + + // Test table or stats is null + Assertions.assertFalse(StatisticsUtil.tableNotAnalyzedForTooLong(null, tableMeta)); + Assertions.assertFalse(StatisticsUtil.tableNotAnalyzedForTooLong(olapTable, null)); + + // Test user injected + tableMeta.userInjected = true; + Assertions.assertFalse(StatisticsUtil.tableNotAnalyzedForTooLong(olapTable, tableMeta)); + + // Test External table + tableMeta.userInjected = false; + Assertions.assertFalse(StatisticsUtil.tableNotAnalyzedForTooLong(externalTable, tableMeta)); + + // Test config is 0 + Config.auto_analyze_interval_seconds = 0; + Assertions.assertFalse(StatisticsUtil.tableNotAnalyzedForTooLong(olapTable, tableMeta)); + + // Test time not long enough + Config.auto_analyze_interval_seconds = 86400; + tableMeta.lastAnalyzeTime = System.currentTimeMillis(); + Assertions.assertFalse(StatisticsUtil.tableNotAnalyzedForTooLong(olapTable, tableMeta)); + + // Test time long enough and update rows > 0 + Config.auto_analyze_interval_seconds = 1; + tableMeta.lastAnalyzeTime = System.currentTimeMillis(); + Thread.sleep(2000); + tableMeta.updatedRows.set(10); + Assertions.assertTrue(StatisticsUtil.tableNotAnalyzedForTooLong(olapTable, tableMeta)); + + // Test row count is not equal with last analyze + tableMeta.updatedRows.set(0); + tableMeta.rowCount = 10; + new MockUp<Table>() { + @Mock + public long getRowCount() { + return 100; + } + }; + Assertions.assertTrue(StatisticsUtil.tableNotAnalyzedForTooLong(olapTable, tableMeta)); + + // Test visible version changed + new MockUp<OlapTable>() { + @Mock + public long getVisibleVersion() { + return 100; + } + }; + new MockUp<Table>() { + @Mock + public long getRowCount() { + return 10; + } + }; + ConcurrentMap<Pair<String, String>, ColStatsMeta> colToColStatsMeta = new ConcurrentHashMap<>(); + ColStatsMeta col1Meta = new ColStatsMeta(0, AnalysisMethod.SAMPLE, AnalysisType.FUNDAMENTALS, JobType.SYSTEM, 0, 100); + ColStatsMeta col2Meta = new ColStatsMeta(0, AnalysisMethod.SAMPLE, AnalysisType.FUNDAMENTALS, JobType.SYSTEM, 0, 101); + colToColStatsMeta.put(Pair.of("index1", "col1"), col1Meta); + colToColStatsMeta.put(Pair.of("index2", "col2"), col2Meta); + tableMeta.setColToColStatsMeta(colToColStatsMeta); + Assertions.assertTrue(StatisticsUtil.tableNotAnalyzedForTooLong(olapTable, tableMeta)); + + // Test visible version unchanged. + col2Meta = new ColStatsMeta(0, AnalysisMethod.SAMPLE, AnalysisType.FUNDAMENTALS, JobType.SYSTEM, 0, 100); + colToColStatsMeta.put(Pair.of("index2", "col2"), col2Meta); + tableMeta.setColToColStatsMeta(colToColStatsMeta); + Assertions.assertFalse(StatisticsUtil.tableNotAnalyzedForTooLong(olapTable, tableMeta)); + } } diff --git a/regression-test/suites/external_table_p0/hive/test_hive_statistic_auto.groovy b/regression-test/suites/external_table_p0/hive/test_hive_statistic_auto.groovy index 8a34bf9204f..56f5a0bd338 100644 --- a/regression-test/suites/external_table_p0/hive/test_hive_statistic_auto.groovy +++ b/regression-test/suites/external_table_p0/hive/test_hive_statistic_auto.groovy @@ -36,13 +36,14 @@ suite("test_hive_statistic_auto", "p0,external,hive,external_docker,external_doc logger.info("catalog " + catalog_name + " created") // Test analyze table without init. - sql """analyze database ${catalog_name}.statistics PROPERTIES("use.auto.analyzer"="true")""" + sql """analyze table ${catalog_name}.statistics.statistics PROPERTIES("use.auto.analyzer"="true")""" sql """use ${catalog_name}.statistics""" for (int i = 0; i < 10; i++) { - Thread.sleep(1000) + Thread.sleep(2000) def result = sql """show column stats `statistics` (lo_quantity)""" - if (result.size <= 0) { + if (result.size() <= 0) { + sql """analyze table ${catalog_name}.statistics.statistics PROPERTIES("use.auto.analyzer"="true")""" continue; } assertEquals(result.size(), 1) @@ -56,7 +57,7 @@ suite("test_hive_statistic_auto", "p0,external,hive,external_docker,external_doc assertEquals(result[0][8], "N/A") result = sql """show column stats `statistics` (lo_orderkey)""" - if (result.size <= 0) { + if (result.size() <= 0) { continue; } assertEquals(result.size(), 1) @@ -70,7 +71,7 @@ suite("test_hive_statistic_auto", "p0,external,hive,external_docker,external_doc assertEquals(result[0][8], "N/A") result = sql """show column stats `statistics` (lo_linenumber)""" - if (result.size <= 0) { + if (result.size() <= 0) { continue; } assertEquals(result.size(), 1) @@ -82,6 +83,7 @@ suite("test_hive_statistic_auto", "p0,external,hive,external_docker,external_doc assertEquals(result[0][6], "4.0") assertEquals(result[0][7], "N/A") assertEquals(result[0][8], "N/A") + break } sql """drop catalog ${catalog_name}""" diff --git a/regression-test/suites/statistics/analyze_stats.groovy b/regression-test/suites/statistics/analyze_stats.groovy index b4edc5e9d7b..c518a15d7dd 100644 --- a/regression-test/suites/statistics/analyze_stats.groovy +++ b/regression-test/suites/statistics/analyze_stats.groovy @@ -2837,7 +2837,7 @@ PARTITION `p599` VALUES IN (599) // Test auto analyze with job type SYSTEM sql """drop stats trigger_test""" - sql """analyze database trigger PROPERTIES("use.auto.analyzer"="true")""" + sql """analyze table trigger_test PROPERTIES("use.auto.analyzer"="true")""" int i = 0; for (0; i < 10; i++) { result = sql """show column stats trigger_test""" @@ -2941,7 +2941,38 @@ PARTITION `p599` VALUES IN (599) new_part_result = sql """show column stats part(colint)""" assertEquals("2.0", new_part_result[0][2]) - sql """DROP DATABASE IF EXISTS trigger""" + + // Test show last analyze table version + sql """create database if not exists test_version""" + sql """use test_version""" + sql """drop table if exists region""" + sql """ + CREATE TABLE region ( + r_regionkey int NOT NULL, + r_name VARCHAR(25) NOT NULL, + r_comment VARCHAR(152) + )ENGINE=OLAP + DUPLICATE KEY(`r_regionkey`) + COMMENT "OLAP" + DISTRIBUTED BY HASH(`r_regionkey`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + ); + """ + sql """analyze table region with sync""" + def versionResult = sql """show column stats region""" + assertEquals(versionResult[0][14], "1") + assertEquals(versionResult[1][14], "1") + assertEquals(versionResult[2][14], "1") + + sql """insert into region values (1, "1", "1")""" + sql """analyze table region with sync""" + versionResult = sql """show column stats region""" + assertEquals(versionResult[0][14], "2") + assertEquals(versionResult[1][14], "2") + assertEquals(versionResult[2][14], "2") + + sql """drop database if exists test_version""" } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org