This is an automated email from the ASF dual-hosted git repository. morrysnow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 05cf095506 [feature](stats) Support full auto analyze (#21192) 05cf095506 is described below commit 05cf0955063852ccba24219e6ca6d1c360dff025 Author: AKIRA <33112463+kikyou1...@users.noreply.github.com> AuthorDate: Mon Jul 17 20:42:57 2023 +0800 [feature](stats) Support full auto analyze (#21192) 1. Auto analyze all tables except for internal tables 2. make resource used by analyze configurable --- .../main/java/org/apache/doris/common/Config.java | 22 +++++-- .../apache/doris/analysis/AnalyzeProperties.java | 9 +++ .../org/apache/doris/analysis/AnalyzeTblStmt.java | 7 ++- .../org/apache/doris/datasource/CatalogIf.java | 5 ++ .../org/apache/doris/datasource/CatalogMgr.java | 6 ++ .../apache/doris/datasource/ExternalCatalog.java | 8 +++ .../apache/doris/datasource/InternalCatalog.java | 5 ++ .../apache/doris/statistics/AnalysisManager.java | 20 +++--- .../doris/statistics/StatisticsAutoAnalyzer.java | 71 +++++++++++++++++++--- .../doris/statistics/util/StatisticsUtil.java | 6 +- 10 files changed, 133 insertions(+), 26 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index e901542ea5..d614d7f9bc 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1444,12 +1444,6 @@ public class Config extends ConfigBase { @ConfField(mutable = true, masterOnly = true) public static int cbo_default_sample_percentage = 10; - /* - * if true, will allow the system to collect statistics automatically - */ - @ConfField(mutable = true, masterOnly = true) - public static boolean enable_auto_collect_statistics = true; - /* * the system automatically checks the time interval for statistics */ @@ -2025,4 +2019,20 @@ public class Config extends ConfigBase { "Hive行数估算分区采样数", "Sample size for hive row count estimation."}) public static int hive_stats_partition_sample_size = 3000; + + @ConfField + public static boolean enable_full_auto_analyze = true; + + @ConfField + public static String full_auto_analyze_start_time = "00:00:00"; + + @ConfField + public static String full_auto_analyze_end_time = "23:59:59"; + + @ConfField + public static int statistics_sql_parallel_exec_instance_num = 1; + + @ConfField + public static long statistics_sql_mem_limit_in_bytes = 2L * 1024 * 1024 * 1024; + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeProperties.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeProperties.java index ccb122bc26..8f0167518b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeProperties.java @@ -24,6 +24,7 @@ import org.apache.doris.statistics.AnalysisInfo.AnalysisType; import com.google.common.collect.ImmutableSet; import org.apache.commons.lang3.StringUtils; +import java.util.HashMap; import java.util.Map; import java.util.Optional; import java.util.concurrent.TimeUnit; @@ -41,6 +42,14 @@ public class AnalyzeProperties { public static final String PROPERTY_ANALYSIS_TYPE = "analysis.type"; public static final String PROPERTY_PERIOD_SECONDS = "period.seconds"; + public static final AnalyzeProperties DEFAULT_PROP = new AnalyzeProperties(new HashMap<String, String>() { + { + put(AnalyzeProperties.PROPERTY_SYNC, "false"); + put(AnalyzeProperties.PROPERTY_AUTOMATIC, "false"); + put(AnalyzeProperties.PROPERTY_ANALYSIS_TYPE, AnalysisType.FUNDAMENTALS.toString()); + } + }); + private static final ImmutableSet<String> PROPERTIES_SET = new ImmutableSet.Builder<String>() .add(PROPERTY_SYNC) .add(PROPERTY_INCREMENTAL) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeTblStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeTblStmt.java index 03681bdb36..83b2ba8f83 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeTblStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeTblStmt.java @@ -253,8 +253,13 @@ public class AnalyzeTblStmt extends AnalyzeStmt { } private void checkAnalyzePriv(String dbName, String tblName) throws AnalysisException { + ConnectContext ctx = ConnectContext.get(); + // means it a system analyze + if (ctx == null) { + return; + } if (!Env.getCurrentEnv().getAccessManager() - .checkTblPriv(ConnectContext.get(), dbName, tblName, PrivPredicate.SELECT)) { + .checkTblPriv(ctx, dbName, tblName, PrivPredicate.SELECT)) { ErrorReport.reportAnalysisException( ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "ANALYZE", diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogIf.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogIf.java index ce13c9881e..6fa054baa5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogIf.java @@ -30,6 +30,7 @@ import com.google.common.collect.Lists; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Optional; @@ -161,4 +162,8 @@ public interface CatalogIf<T extends DatabaseIf> { log.setProps(getProperties()); return log; } + + // Return a copy of all db collection. + @SuppressWarnings({"rawtypes", "unchecked"}) + public Collection<DatabaseIf> getAllDbs(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java index 3ed74e260b..21a39325d7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java @@ -61,8 +61,10 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.Arrays; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TreeMap; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Function; @@ -1086,5 +1088,9 @@ public class CatalogMgr implements Writable, GsonPostProcessable { public Map<Long, CatalogIf> getIdToCatalog() { return idToCatalog; } + + public Set<CatalogIf> getCopyOfCatalog() { + return new HashSet<>(idToCatalog.values()); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index cf2de86494..3765ac5153 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -18,6 +18,7 @@ package org.apache.doris.datasource; import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.Resource; import org.apache.doris.catalog.external.EsExternalDatabase; @@ -55,6 +56,8 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; @@ -547,4 +550,9 @@ public abstract class ExternalCatalog } return ret; } + + @Override + public Collection<DatabaseIf> getAllDbs() { + return new HashSet<>(idToDb.values()); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index e9da68939c..16d6b46d50 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -2960,4 +2960,9 @@ public class InternalCatalog implements CatalogIf<Database> { public ConcurrentHashMap<Long, Database> getIdToDb() { return new ConcurrentHashMap<>(idToDb); } + + @Override + public Collection<DatabaseIf> getAllDbs() { + return new HashSet<>(idToDb.values()); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index 864fed0e86..63bb387992 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -18,6 +18,7 @@ package org.apache.doris.statistics; import org.apache.doris.analysis.AnalyzeDBStmt; +import org.apache.doris.analysis.AnalyzeProperties; import org.apache.doris.analysis.AnalyzeStmt; import org.apache.doris.analysis.AnalyzeTblStmt; import org.apache.doris.analysis.DropAnalyzeJobStmt; @@ -162,6 +163,14 @@ public class AnalysisManager extends Daemon implements Writable { public void createAnalysisJobs(AnalyzeDBStmt analyzeDBStmt, boolean proxy) throws DdlException { DatabaseIf<TableIf> db = analyzeDBStmt.getDb(); + List<AnalysisInfo> analysisInfos = buildAnalysisInfosForDB(db, analyzeDBStmt.getAnalyzeProperties()); + if (!analyzeDBStmt.isSync()) { + sendJobId(analysisInfos, proxy); + } + } + + public List<AnalysisInfo> buildAnalysisInfosForDB(DatabaseIf<TableIf> db, AnalyzeProperties analyzeProperties) + throws DdlException { List<TableIf> tbls = db.getTables(); List<AnalysisInfo> analysisInfos = new ArrayList<>(); db.readLock(); @@ -171,9 +180,9 @@ public class AnalysisManager extends Daemon implements Writable { if (table instanceof View) { continue; } - TableName tableName = new TableName(analyzeDBStmt.getCtlIf().getName(), db.getFullName(), + TableName tableName = new TableName(db.getCatalog().getName(), db.getFullName(), table.getName()); - AnalyzeTblStmt analyzeTblStmt = new AnalyzeTblStmt(analyzeDBStmt.getAnalyzeProperties(), tableName, + AnalyzeTblStmt analyzeTblStmt = new AnalyzeTblStmt(analyzeProperties, tableName, table.getBaseSchema().stream().map( Column::getName).collect( Collectors.toList()), db.getId(), table); @@ -187,13 +196,10 @@ public class AnalysisManager extends Daemon implements Writable { for (AnalyzeTblStmt analyzeTblStmt : analyzeStmts) { analysisInfos.add(buildAndAssignJob(analyzeTblStmt)); } - if (!analyzeDBStmt.isSync()) { - sendJobId(analysisInfos, proxy); - } } finally { db.readUnlock(); } - + return analysisInfos; } // Each analyze stmt corresponding to an analysis job. @@ -245,7 +251,7 @@ public class AnalysisManager extends Daemon implements Writable { } // Analysis job created by the system - public void createAnalysisJob(AnalysisInfo info) throws DdlException { + public void createSystemAnalysisJob(AnalysisInfo info) throws DdlException { AnalysisInfo jobInfo = buildAnalysisJobInfo(info); if (jobInfo.colToPartitions.isEmpty()) { // No statistics need to be collected or updated diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoAnalyzer.java index aae783ca8b..c44a2a545a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoAnalyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoAnalyzer.java @@ -17,14 +17,17 @@ package org.apache.doris.statistics; -import org.apache.doris.analysis.DdlStmt; +import org.apache.doris.analysis.AnalyzeProperties; import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.util.MasterDaemon; +import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.datasource.CatalogIf; import org.apache.doris.statistics.AnalysisInfo.JobType; import org.apache.doris.statistics.util.StatisticsUtil; @@ -32,6 +35,9 @@ import com.google.common.collect.Maps; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.time.LocalTime; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeParseException; import java.util.Collection; import java.util.HashSet; import java.util.List; @@ -56,14 +62,44 @@ public class StatisticsAutoAnalyzer extends MasterDaemon { if (!StatisticsUtil.statsTblAvailable()) { return; } - if (Config.enable_auto_collect_statistics) { + + if (!checkAnalyzeTime(LocalTime.now(TimeUtils.getTimeZone().toZoneId()))) { + return; + } + + if (!Config.enable_full_auto_analyze) { analyzePeriodically(); analyzeAutomatically(); + } else { + analyzeAll(); } } - public void autoAnalyzeStats(DdlStmt ddlStmt) { - // TODO Monitor some DDL statements, and then trigger automatic analysis tasks + @SuppressWarnings({"rawtypes", "unchecked"}) + private void analyzeAll() { + Set<CatalogIf> catalogs = Env.getCurrentEnv().getCatalogMgr().getCopyOfCatalog(); + for (CatalogIf ctl : catalogs) { + try { + Collection<DatabaseIf> dbs = ctl.getAllDbs(); + for (DatabaseIf<TableIf> databaseIf : dbs) { + if (StatisticConstants.STATISTICS_DB_BLACK_LIST.contains(databaseIf.getFullName())) { + continue; + } + AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager(); + List<AnalysisInfo> analysisInfos = analysisManager.buildAnalysisInfosForDB(databaseIf, + AnalyzeProperties.DEFAULT_PROP); + for (AnalysisInfo analysisInfo : analysisInfos) { + analysisInfo = getReAnalyzeRequiredPart(analysisInfo); + if (analysisInfo == null) { + continue; + } + analysisManager.createSystemAnalysisJob(analysisInfo); + } + } + } catch (Throwable t) { + LOG.warn("Failed to analyze all statistics.", t); + } + } } private void analyzePeriodically() { @@ -72,7 +108,7 @@ public class StatisticsAutoAnalyzer extends MasterDaemon { List<AnalysisInfo> jobInfos = analysisManager.findPeriodicJobs(); for (AnalysisInfo jobInfo : jobInfos) { jobInfo = new AnalysisInfoBuilder(jobInfo).setJobType(JobType.SYSTEM).build(); - analysisManager.createAnalysisJob(jobInfo); + analysisManager.createSystemAnalysisJob(jobInfo); } } catch (DdlException e) { LOG.warn("Failed to periodically analyze the statistics." + e); @@ -85,12 +121,12 @@ public class StatisticsAutoAnalyzer extends MasterDaemon { for (AnalysisInfo jobInfo : jobInfos) { AnalysisInfo checkedJobInfo = null; try { - checkedJobInfo = checkAutomaticJobInfo(jobInfo); + checkedJobInfo = getReAnalyzeRequiredPart(jobInfo); if (checkedJobInfo != null) { - analysisManager.createAnalysisJob(checkedJobInfo); + analysisManager.createSystemAnalysisJob(checkedJobInfo); } } catch (Throwable t) { - LOG.warn("Failed to create analyze job: {}", checkedJobInfo); + LOG.warn("Failed to create analyze job: {}", checkedJobInfo, t); } } @@ -116,7 +152,7 @@ public class StatisticsAutoAnalyzer extends MasterDaemon { * @return new job info after check * @throws Throwable failed to check */ - private AnalysisInfo checkAutomaticJobInfo(AnalysisInfo jobInfo) throws Throwable { + private AnalysisInfo getReAnalyzeRequiredPart(AnalysisInfo jobInfo) throws Throwable { long lastExecTimeInMs = jobInfo.lastExecTimeInMs; TableIf table = StatisticsUtil .findTable(jobInfo.catalogName, jobInfo.dbName, jobInfo.tblName); @@ -212,4 +248,21 @@ public class StatisticsAutoAnalyzer extends MasterDaemon { return new AnalysisInfoBuilder(jobInfo) .setColToPartitions(newColToPartitions).build(); } + + private boolean checkAnalyzeTime(LocalTime now) { + try { + DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss"); + LocalTime start = LocalTime.parse(Config.full_auto_analyze_start_time, timeFormatter); + LocalTime end = LocalTime.parse(Config.full_auto_analyze_end_time, timeFormatter); + + if (start.isAfter(end) && (now.isAfter(start) || now.isBefore(end))) { + return true; + } else { + return now.isAfter(start) && now.isBefore(end); + } + } catch (DateTimeParseException e) { + LOG.warn("Parse analyze start/end time format fail", e); + return true; + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java index 6a52bb36b1..b118a7d02f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java @@ -165,10 +165,10 @@ public class StatisticsUtil { ConnectContext connectContext = new ConnectContext(); SessionVariable sessionVariable = connectContext.getSessionVariable(); sessionVariable.internalSession = true; - sessionVariable.setMaxExecMemByte(StatisticConstants.STATISTICS_MAX_MEM_PER_QUERY_IN_BYTES); + sessionVariable.setMaxExecMemByte(Config.statistics_sql_mem_limit_in_bytes); sessionVariable.setEnableInsertStrict(true); - sessionVariable.parallelExecInstanceNum = StatisticConstants.STATISTIC_PARALLEL_EXEC_INSTANCE_NUM; - sessionVariable.parallelPipelineTaskNum = StatisticConstants.STATISTIC_PARALLEL_EXEC_INSTANCE_NUM; + sessionVariable.parallelExecInstanceNum = Config.statistics_sql_parallel_exec_instance_num; + sessionVariable.parallelPipelineTaskNum = Config.statistics_sql_parallel_exec_instance_num; sessionVariable.setEnableNereidsPlanner(false); sessionVariable.enableProfile = false; connectContext.setEnv(Env.getCurrentEnv()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org