This is an automated email from the ASF dual-hosted git repository. starocean999 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new afb6a57aa8 [enhancement](nereids) Improve stats preload performance (#21970) afb6a57aa8 is described below commit afb6a57aa8741ecc0a61754100d17703d5c9894c Author: AKIRA <33112463+kikyou1...@users.noreply.github.com> AuthorDate: Mon Jul 31 17:32:01 2023 +0800 [enhancement](nereids) Improve stats preload performance (#21970) --- .../main/java/org/apache/doris/common/Config.java | 13 ++- .../org/apache/doris/analysis/AnalyzeTblStmt.java | 7 +- .../java/org/apache/doris/nereids/memo/Memo.java | 2 +- .../apache/doris/statistics/AnalysisManager.java | 17 ++-- .../doris/statistics/AnalysisTaskExecutor.java | 40 +++----- .../doris/statistics/AnalysisTaskScheduler.java | 108 --------------------- .../apache/doris/statistics/BaseAnalysisTask.java | 8 +- .../apache/doris/statistics/ColumnStatistic.java | 14 +-- .../apache/doris/statistics/OlapAnalysisTask.java | 31 +++--- .../doris/statistics/StatisticsAutoAnalyzer.java | 25 +++-- .../apache/doris/statistics/StatisticsCache.java | 66 ++++++++----- .../doris/statistics/StatisticsRepository.java | 15 +-- .../doris/statistics/util/StatisticsUtil.java | 17 +++- .../apache/doris/statistics/AnalysisJobTest.java | 10 +- .../doris/statistics/AnalysisTaskExecutorTest.java | 28 ++---- .../apache/doris/statistics/HistogramTaskTest.java | 14 +-- regression-test/pipeline/p0/conf/fe.conf | 2 +- 17 files changed, 158 insertions(+), 259 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index a47ae31cdd..1b4c61a866 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1680,7 +1680,7 @@ public class Config extends ConfigBase { * Used to determined how many statistics collection SQL could run simultaneously. */ @ConfField - public static int statistics_simultaneously_running_task_num = 10; + public static int statistics_simultaneously_running_task_num = 5; /** * if table has too many replicas, Fe occur oom when schema change. @@ -2031,13 +2031,13 @@ public class Config extends ConfigBase { public static int hive_stats_partition_sample_size = 3000; @ConfField - public static boolean enable_full_auto_analyze = false; + public static boolean enable_full_auto_analyze = true; @ConfField public static String full_auto_analyze_start_time = "00:00:00"; @ConfField - public static String full_auto_analyze_end_time = "23:59:59"; + public static String full_auto_analyze_end_time = "02:00:00"; @ConfField public static int statistics_sql_parallel_exec_instance_num = 1; @@ -2056,4 +2056,11 @@ public class Config extends ConfigBase { + "and modifying table properties. " + "This config is recommended to be used only in the test environment"}) public static int force_olap_table_replication_num = 0; + + @ConfField + public static int full_auto_analyze_simultaneously_running_task_num = 1; + + @ConfField + public static int cpu_resource_limit_per_analyze_task = 1; + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeTblStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeTblStmt.java index da08f45bee..527f802748 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeTblStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeTblStmt.java @@ -36,6 +36,7 @@ import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; import org.apache.doris.statistics.AnalysisInfo.AnalysisType; import org.apache.doris.statistics.ColumnStatistic; +import org.apache.doris.statistics.util.StatisticsUtil; import com.google.common.collect.Sets; import org.apache.commons.lang3.StringUtils; @@ -192,10 +193,10 @@ public class AnalyzeTblStmt extends AnalyzeStmt { } } if (containsUnsupportedTytpe) { - if (!ConnectContext.get().getSessionVariable().enableAnalyzeComplexTypeColumn) { + if (ConnectContext.get() == null + || !ConnectContext.get().getSessionVariable().enableAnalyzeComplexTypeColumn) { columnNames = columnNames.stream() - .filter(c -> !ColumnStatistic.UNSUPPORTED_TYPE.contains( - table.getColumn(c).getType())) + .filter(c -> !StatisticsUtil.isUnsupportedType(table.getColumn(c).getType())) .collect(Collectors.toList()); } else { throw new AnalysisException( diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Memo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Memo.java index a313975684..18ea08d58e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Memo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Memo.java @@ -115,7 +115,7 @@ public class Memo { public void removePhysicalExpression() { groupExpressions.entrySet().removeIf(entry -> entry.getValue().getPlan() instanceof PhysicalPlan); - Iterator<Entry<GroupId, Group>> iterator = groups.entrySet().iterator(); + Iterator<Map.Entry<GroupId, Group>> iterator = groups.entrySet().iterator(); while (iterator.hasNext()) { Map.Entry<GroupId, Group> entry = iterator.next(); Group group = entry.getValue(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index 66f0b94aa8..63ab923234 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -94,8 +94,6 @@ import java.util.stream.Collectors; public class AnalysisManager extends Daemon implements Writable { - public AnalysisTaskScheduler taskScheduler; - private static final Logger LOG = LogManager.getLogger(AnalysisManager.class); private ConcurrentMap<Long, Map<Long, BaseAnalysisTask>> analysisJobIdToTaskMap = new ConcurrentHashMap<>(); @@ -112,8 +110,7 @@ public class AnalysisManager extends Daemon implements Writable { public AnalysisManager() { super(TimeUnit.SECONDS.toMillis(StatisticConstants.ANALYZE_MANAGER_INTERVAL_IN_SECS)); if (!Env.isCheckpointThread()) { - this.taskScheduler = new AnalysisTaskScheduler(); - this.taskExecutor = new AnalysisTaskExecutor(taskScheduler); + this.taskExecutor = new AnalysisTaskExecutor(Config.statistics_simultaneously_running_task_num); this.statisticsCache = new StatisticsCache(); taskExecutor.start(); } @@ -192,7 +189,9 @@ public class AnalysisManager extends Daemon implements Writable { table.getName()); // columnNames null means to add all visitable columns. AnalyzeTblStmt analyzeTblStmt = new AnalyzeTblStmt(analyzeProperties, tableName, - null, db.getId(), table); + table.getBaseSchema().stream().filter(c -> !StatisticsUtil.isUnsupportedType(c.getType())).map( + Column::getName).collect( + Collectors.toList()), db.getId(), table); try { analyzeTblStmt.check(); } catch (AnalysisException analysisException) { @@ -254,12 +253,13 @@ public class AnalysisManager extends Daemon implements Writable { return null; } - analysisTaskInfos.values().forEach(taskScheduler::schedule); + analysisTaskInfos.values().forEach(taskExecutor::submitTask); return jobInfo; } // Analysis job created by the system - public void createSystemAnalysisJob(AnalysisInfo info) throws DdlException { + public void createSystemAnalysisJob(AnalysisInfo info, AnalysisTaskExecutor analysisTaskExecutor) + throws DdlException { AnalysisInfo jobInfo = buildAnalysisJobInfo(info); if (jobInfo.colToPartitions.isEmpty()) { // No statistics need to be collected or updated @@ -273,8 +273,7 @@ public class AnalysisManager extends Daemon implements Writable { persistAnalysisJob(jobInfo); analysisJobIdToTaskMap.put(jobInfo.jobId, analysisTaskInfos); } - - analysisTaskInfos.values().forEach(taskScheduler::schedule); + analysisTaskInfos.values().forEach(taskExecutor::submitTask); } private void sendJobId(List<AnalysisInfo> analysisInfos, boolean proxy) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java index b5ec7aeb87..fb23050fff 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java @@ -35,26 +35,23 @@ public class AnalysisTaskExecutor extends Thread { private static final Logger LOG = LogManager.getLogger(AnalysisTaskExecutor.class); - private final ThreadPoolExecutor executors = ThreadPoolManager.newDaemonThreadPool( - Config.statistics_simultaneously_running_task_num, - Config.statistics_simultaneously_running_task_num, 0, - TimeUnit.DAYS, new LinkedBlockingQueue<>(), - new BlockedPolicy("Analysis Job Executor", Integer.MAX_VALUE), - "Analysis Job Executor", true); - - private final AnalysisTaskScheduler taskScheduler; + private final ThreadPoolExecutor executors; private final BlockingQueue<AnalysisTaskWrapper> taskQueue = new PriorityBlockingQueue<AnalysisTaskWrapper>(20, Comparator.comparingLong(AnalysisTaskWrapper::getStartTime)); - public AnalysisTaskExecutor(AnalysisTaskScheduler jobExecutor) { - this.taskScheduler = jobExecutor; + public AnalysisTaskExecutor(int simultaneouslyRunningTaskNum) { + executors = ThreadPoolManager.newDaemonThreadPool( + simultaneouslyRunningTaskNum, + simultaneouslyRunningTaskNum, 0, + TimeUnit.DAYS, new LinkedBlockingQueue<>(), + new BlockedPolicy("Analysis Job Executor", Integer.MAX_VALUE), + "Analysis Job Executor", true); } @Override public void run() { - fetchAndExecute(); cancelExpiredTask(); } @@ -82,22 +79,7 @@ public class AnalysisTaskExecutor extends Thread { } } - public void fetchAndExecute() { - Thread t = new Thread(() -> { - for (;;) { - try { - doFetchAndExecute(); - } catch (Throwable throwable) { - LOG.warn(throwable); - } - } - }, "Analysis Task Submitter"); - t.setDaemon(true); - t.start(); - } - - private void doFetchAndExecute() { - BaseAnalysisTask task = taskScheduler.getPendingTasks(); + public void submitTask(BaseAnalysisTask task) { AnalysisTaskWrapper taskWrapper = new AnalysisTaskWrapper(this, task); executors.submit(taskWrapper); } @@ -105,4 +87,8 @@ public class AnalysisTaskExecutor extends Thread { public void putJob(AnalysisTaskWrapper wrapper) throws Exception { taskQueue.put(wrapper); } + + public boolean idle() { + return executors.getQueue().isEmpty(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskScheduler.java deleted file mode 100644 index 5c9de2b58b..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskScheduler.java +++ /dev/null @@ -1,108 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.statistics; - -import org.apache.doris.catalog.Env; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.util.Comparator; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.PriorityQueue; -import java.util.Queue; -import java.util.Set; - -public class AnalysisTaskScheduler { - - private static final Logger LOG = LogManager.getLogger(AnalysisTaskScheduler.class); - - private final PriorityQueue<BaseAnalysisTask> systemJobQueue = - new PriorityQueue<>(Comparator.comparingLong(BaseAnalysisTask::getLastExecTime)); - - private final Queue<BaseAnalysisTask> manualJobQueue = new LinkedList<>(); - - private final Set<BaseAnalysisTask> systemJobSet = new HashSet<>(); - - private final Set<BaseAnalysisTask> manualJobSet = new HashSet<>(); - - public synchronized void schedule(BaseAnalysisTask analysisTask) { - try { - - switch (analysisTask.info.jobType) { - case MANUAL: - addToManualJobQueue(analysisTask); - break; - case SYSTEM: - addToSystemQueue(analysisTask); - break; - default: - throw new IllegalArgumentException("Unknown job type: " + analysisTask.info.jobType); - } - } catch (Throwable t) { - Env.getCurrentEnv().getAnalysisManager().updateTaskStatus( - analysisTask.info, AnalysisState.FAILED, t.getMessage(), System.currentTimeMillis()); - } - } - - // Make sure invoker of this method is synchronized on object. - - private void addToSystemQueue(BaseAnalysisTask analysisJobInfo) { - if (systemJobSet.contains(analysisJobInfo)) { - return; - } - systemJobSet.add(analysisJobInfo); - systemJobQueue.add(analysisJobInfo); - notify(); - } - - // Make sure invoker of this method is synchronized on object. - private void addToManualJobQueue(BaseAnalysisTask analysisJobInfo) { - if (manualJobSet.contains(analysisJobInfo)) { - return; - } - manualJobSet.add(analysisJobInfo); - manualJobQueue.add(analysisJobInfo); - notify(); - } - - public synchronized BaseAnalysisTask getPendingTasks() { - while (true) { - if (!manualJobQueue.isEmpty()) { - return pollAndRemove(manualJobQueue, manualJobSet); - } - if (!systemJobQueue.isEmpty()) { - return pollAndRemove(systemJobQueue, systemJobSet); - } - try { - wait(); - } catch (Exception e) { - LOG.warn("Thread get interrupted when waiting for pending jobs", e); - return null; - } - } - } - - // Poll from queue, remove from set. Make sure invoker of this method is synchronized on object. - private BaseAnalysisTask pollAndRemove(Queue<BaseAnalysisTask> q, Set<BaseAnalysisTask> s) { - BaseAnalysisTask t = q.poll(); - s.remove(t); - return t; - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java index e146a2e8e3..fc264e0661 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java @@ -26,8 +26,10 @@ import org.apache.doris.datasource.CatalogIf; import org.apache.doris.qe.StmtExecutor; import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod; import org.apache.doris.statistics.AnalysisInfo.AnalysisType; +import org.apache.doris.statistics.util.StatisticsUtil; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -152,10 +154,8 @@ public abstract class BaseAnalysisTask { if (col == null) { throw new RuntimeException(String.format("Column with name %s not exists", info.tblName)); } - if (isUnsupportedType(col.getType().getPrimitiveType())) { - throw new RuntimeException(String.format("Column with type %s is not supported", - col.getType().toString())); - } + Preconditions.checkArgument(!StatisticsUtil.isUnsupportedType(col.getType()), + String.format("Column with type %s is not supported", col.getType().toString())); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java index 6887108a68..7e23c83e07 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java @@ -19,9 +19,7 @@ package org.apache.doris.statistics; import org.apache.doris.analysis.LiteralExpr; import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.Env; import org.apache.doris.catalog.Type; -import org.apache.doris.common.DdlException; import org.apache.doris.statistics.util.InternalQueryResult.ResultRow; import org.apache.doris.statistics.util.StatisticsUtil; @@ -206,9 +204,6 @@ public class ColumnStatistic { columnStatisticBuilder.setMaxValue(Double.MAX_VALUE); } columnStatisticBuilder.setSelectivity(1.0); - Histogram histogram = Env.getCurrentEnv().getStatisticsCache().getHistogram(tblId, idxId, colName) - .orElse(null); - columnStatisticBuilder.setHistogram(histogram); columnStatisticBuilder.setUpdatedTime(resultRow.getColumnValue("update_time")); return columnStatisticBuilder.build(); } catch (Exception e) { @@ -428,12 +423,7 @@ public class ColumnStatistic { return isUnKnown; } - public void loadPartitionStats(long tableId, long idxId, String colName) throws DdlException { - List<ResultRow> resultRows = StatisticsRepository.loadPartStats(tableId, idxId, colName); - for (ResultRow resultRow : resultRows) { - String partId = resultRow.getColumnValue("part_id"); - ColumnStatistic columnStatistic = ColumnStatistic.fromResultRow(resultRow); - partitionIdToColStats.put(Long.parseLong(partId), columnStatistic); - } + public void putPartStats(long partId, ColumnStatistic columnStatistic) { + this.partitionIdToColStats.put(partId, columnStatistic); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java index a980385bde..3f3a04d620 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java @@ -110,21 +110,24 @@ public class OlapAnalysisTask extends BaseAnalysisTask { @VisibleForTesting public void execSQL(String sql) throws Exception { - if (killed) { - return; - } - long startTime = System.currentTimeMillis(); - try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) { - r.connectContext.getSessionVariable().disableNereidsPlannerOnce(); - stmtExecutor = new StmtExecutor(r.connectContext, sql); - r.connectContext.setExecutor(stmtExecutor); - stmtExecutor.execute(); - QueryState queryState = r.connectContext.getState(); - if (queryState.getStateType().equals(MysqlStateType.ERR)) { - throw new RuntimeException(String.format("Failed to analyze %s.%s.%s, error: %s sql: %s", - info.catalogName, info.dbName, info.colName, sql, queryState.getErrorMessage())); + synchronized (OlapAnalysisTask.class) { + if (killed) { + return; + } + long startTime = System.currentTimeMillis(); + try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) { + r.connectContext.getSessionVariable().disableNereidsPlannerOnce(); + stmtExecutor = new StmtExecutor(r.connectContext, sql); + r.connectContext.setExecutor(stmtExecutor); + stmtExecutor.execute(); + QueryState queryState = r.connectContext.getState(); + if (queryState.getStateType().equals(MysqlStateType.ERR)) { + throw new RuntimeException(String.format("Failed to analyze %s.%s.%s, error: %s sql: %s", + info.catalogName, info.dbName, info.colName, sql, queryState.getErrorMessage())); + } + LOG.info("Analyze SQL: " + sql + " cost time: " + (System.currentTimeMillis() - startTime) + "ms"); } - LOG.info("Analyze SQL: " + sql + " cost time: " + (System.currentTimeMillis() - startTime) + "ms"); } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoAnalyzer.java index 181af16882..02fe96ec71 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoAnalyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoAnalyzer.java @@ -50,8 +50,11 @@ public class StatisticsAutoAnalyzer extends MasterDaemon { private static final Logger LOG = LogManager.getLogger(StatisticsAutoAnalyzer.class); + private AnalysisTaskExecutor analysisTaskExecutor; + public StatisticsAutoAnalyzer() { super("Automatic Analyzer", TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes)); + analysisTaskExecutor = new AnalysisTaskExecutor(Config.full_auto_analyze_simultaneously_running_task_num); } @Override @@ -66,12 +69,16 @@ public class StatisticsAutoAnalyzer extends MasterDaemon { return; } - // if (!Config.enable_full_auto_analyze) { - // analyzePeriodically(); - // analyzeAutomatically(); - // } else { - // analyzeAll(); - // } + if (!analysisTaskExecutor.idle()) { + return; + } + + if (!Config.enable_full_auto_analyze) { + analyzePeriodically(); + analyzeAutomatically(); + } else { + analyzeAll(); + } } @SuppressWarnings({"rawtypes", "unchecked"}) @@ -92,7 +99,7 @@ public class StatisticsAutoAnalyzer extends MasterDaemon { if (analysisInfo == null) { continue; } - analysisManager.createSystemAnalysisJob(analysisInfo); + analysisManager.createSystemAnalysisJob(analysisInfo, analysisTaskExecutor); } } } catch (Throwable t) { @@ -109,7 +116,7 @@ public class StatisticsAutoAnalyzer extends MasterDaemon { List<AnalysisInfo> jobInfos = analysisManager.findPeriodicJobs(); for (AnalysisInfo jobInfo : jobInfos) { jobInfo = new AnalysisInfoBuilder(jobInfo).setJobType(JobType.SYSTEM).build(); - analysisManager.createSystemAnalysisJob(jobInfo); + analysisManager.createSystemAnalysisJob(jobInfo, analysisTaskExecutor); } } catch (DdlException e) { LOG.warn("Failed to periodically analyze the statistics." + e); @@ -124,7 +131,7 @@ public class StatisticsAutoAnalyzer extends MasterDaemon { try { checkedJobInfo = getReAnalyzeRequiredPart(jobInfo); if (checkedJobInfo != null) { - analysisManager.createSystemAnalysisJob(checkedJobInfo); + analysisManager.createSystemAnalysisJob(checkedJobInfo, analysisTaskExecutor); } } catch (Throwable t) { LOG.warn("Failed to create analyze job: {}", checkedJobInfo, t); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java index 1149ecdd5a..d5a3b972f3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java @@ -38,10 +38,13 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.time.Duration; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -213,6 +216,7 @@ public class StatisticsCache { if (CollectionUtils.isEmpty(recentStatsUpdatedCols)) { return; } + Map<StatisticsCacheKey, ColumnStatistic> keyToColStats = new HashMap<>(); for (ResultRow r : recentStatsUpdatedCols) { try { long tblId = Long.parseLong(r.getColumnValue("tbl_id")); @@ -221,12 +225,17 @@ public class StatisticsCache { final StatisticsCacheKey k = new StatisticsCacheKey(tblId, idxId, colId); final ColumnStatistic c = ColumnStatistic.fromResultRow(r); - c.loadPartitionStats(tblId, idxId, colId); + keyToColStats.put(k, c); putCache(k, c); } catch (Throwable t) { LOG.warn("Error when preheating stats cache", t); } } + try { + loadPartStats(keyToColStats); + } catch (Exception e) { + LOG.warn("Fucka", e); + } } public void syncLoadColStats(long tableId, long idxId, String colName) { @@ -261,32 +270,43 @@ public class StatisticsCache { } public void putCache(StatisticsCacheKey k, ColumnStatistic c) { - CompletableFuture<Optional<ColumnStatistic>> f = new CompletableFuture<Optional<ColumnStatistic>>() { - - @Override - public Optional<ColumnStatistic> get() throws InterruptedException, ExecutionException { - return Optional.of(c); - } - - @Override - public boolean isDone() { - return true; - } + CompletableFuture<Optional<ColumnStatistic>> f = new CompletableFuture<Optional<ColumnStatistic>>(); + f.obtrudeValue(Optional.of(c)); + columnStatisticsCache.put(k, f); + } - @Override - public boolean complete(Optional<ColumnStatistic> value) { - return true; + private void loadPartStats(Map<StatisticsCacheKey, ColumnStatistic> keyToColStats) { + final int batchSize = Config.expr_children_limit; + Set<StatisticsCacheKey> keySet = new HashSet<>(); + for (StatisticsCacheKey statisticsCacheKey : keyToColStats.keySet()) { + if (keySet.size() < batchSize - 1) { + keySet.add(statisticsCacheKey); + } else { + List<ResultRow> partStats = StatisticsRepository.loadPartStats(keySet); + addPartStatsToColStats(keyToColStats, partStats); + keySet = new HashSet<>(); } + } + if (!keySet.isEmpty()) { + List<ResultRow> partStats = StatisticsRepository.loadPartStats(keySet); + addPartStatsToColStats(keyToColStats, partStats); + } + } - @Override - public Optional<ColumnStatistic> join() { - return Optional.of(c); + private void addPartStatsToColStats(Map<StatisticsCacheKey, ColumnStatistic> keyToColStats, + List<ResultRow> partsStats) { + for (ResultRow r : partsStats) { + try { + long tblId = Long.parseLong(r.getColumnValue("tbl_id")); + long idxId = Long.parseLong(r.getColumnValue("idx_id")); + long partId = Long.parseLong(r.getColumnValue("part_id")); + String colId = r.getColumnValue("col_id"); + ColumnStatistic partStats = ColumnStatistic.fromResultRow(r); + keyToColStats.get(new StatisticsCacheKey(tblId, idxId, colId)).putPartStats(partId, partStats); + } catch (Throwable t) { + LOG.warn("Failed to deserialized part stats", t); } - }; - if (c.isUnKnown) { - return; } - columnStatisticsCache.put(k, f); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java index d20bb358c1..7a043e7708 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java @@ -38,6 +38,7 @@ import org.apache.commons.text.StringSubstitutor; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -126,7 +127,7 @@ public class StatisticsRepository { private static final String QUERY_PARTITION_STATISTICS = "SELECT * FROM " + FeConstants.INTERNAL_DB_NAME + "." + StatisticConstants.STATISTIC_TBL_NAME + " WHERE " - + " tbl_id=${tblId} AND idx_id=${idxId} AND col_id='${colId}' " + + " ${inPredicate}" + " AND part_id IS NOT NULL"; public static ColumnStatistic queryColumnStatisticsByName(long tableId, String colName) { @@ -440,12 +441,14 @@ public class StatisticsRepository { .replace(QUERY_COLUMN_STATISTICS)); } - public static List<ResultRow> loadPartStats(long tableId, long idxId, String colName) { + public static List<ResultRow> loadPartStats(Collection<StatisticsCacheKey> keys) { + String inPredicate = "CONCAT(tbl_id, '-', idx_id, '-', col_id) in (%s)"; + StringJoiner sj = new StringJoiner(","); + for (StatisticsCacheKey statisticsCacheKey : keys) { + sj.add("'" + statisticsCacheKey.toString() + "'"); + } Map<String, String> params = new HashMap<>(); - params.put("tblId", String.valueOf(tableId)); - params.put("idxId", String.valueOf(idxId)); - params.put("colId", colName); - + params.put("inPredicate", String.format(inPredicate, sj.toString())); return StatisticsUtil.execStatisticQuery(new StringSubstitutor(params) .replace(QUERY_PARTITION_STATISTICS)); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java index 0451097967..e82a8b955c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java @@ -29,17 +29,21 @@ import org.apache.doris.analysis.StatementBase; import org.apache.doris.analysis.StringLiteral; import org.apache.doris.analysis.TableName; import org.apache.doris.analysis.UserIdentity; +import org.apache.doris.catalog.ArrayType; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.ListPartitionItem; +import org.apache.doris.catalog.MapType; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.PartitionItem; import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.ScalarType; +import org.apache.doris.catalog.StructType; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.Type; +import org.apache.doris.catalog.VariantType; import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; @@ -168,6 +172,7 @@ public class StatisticsUtil { SessionVariable sessionVariable = connectContext.getSessionVariable(); sessionVariable.internalSession = true; sessionVariable.setMaxExecMemByte(Config.statistics_sql_mem_limit_in_bytes); + sessionVariable.cpuResourceLimit = Config.cpu_resource_limit_per_analyze_task; sessionVariable.setEnableInsertStrict(true); sessionVariable.parallelExecInstanceNum = Config.statistics_sql_parallel_exec_instance_num; sessionVariable.parallelPipelineTaskNum = Config.statistics_sql_parallel_exec_instance_num; @@ -633,7 +638,7 @@ public class StatisticsUtil { } private static void processDataFile(DataFile dataFile, PartitionSpec partitionSpec, - String colName, ColumnStatisticBuilder columnStatisticBuilder) { + String colName, ColumnStatisticBuilder columnStatisticBuilder) { int colId = -1; for (Types.NestedField column : partitionSpec.schema().columns()) { if (column.name().equals(colName)) { @@ -651,4 +656,14 @@ public class StatisticsUtil { columnStatisticBuilder.setNumNulls(columnStatisticBuilder.getNumNulls() + dataFile.nullValueCounts().get(colId)); } + + public static boolean isUnsupportedType(Type type) { + if (ColumnStatistic.UNSUPPORTED_TYPE.contains(type)) { + return true; + } + return type instanceof ArrayType + || type instanceof StructType + || type instanceof MapType + || type instanceof VariantType; + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java index ca145f07f2..fa0c0fc8ef 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java @@ -62,13 +62,7 @@ public class AnalysisJobTest extends TestWithFeService { } @Test - public void testCreateAnalysisJob(@Mocked AnalysisTaskScheduler scheduler) throws Exception { - new Expectations() { - { - scheduler.schedule((BaseAnalysisTask) any); - times = 3; - } - }; + public void testCreateAnalysisJob() throws Exception { new MockUp<StatisticsUtil>() { @@ -101,7 +95,7 @@ public class AnalysisJobTest extends TestWithFeService { } @Test - public void testJobExecution(@Mocked AnalysisTaskScheduler scheduler, @Mocked StmtExecutor stmtExecutor) + public void testJobExecution(@Mocked StmtExecutor stmtExecutor) throws Exception { new MockUp<StatisticsUtil>() { diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java index 8da819f09c..453eb78628 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java @@ -28,9 +28,9 @@ import org.apache.doris.statistics.util.InternalQueryResult.ResultRow; import org.apache.doris.utframe.TestWithFeService; import com.google.common.collect.Maps; +import mockit.Expectations; import mockit.Mock; import mockit.MockUp; -import mockit.Mocked; import org.junit.jupiter.api.Test; import java.util.Collections; @@ -41,8 +41,6 @@ import java.util.concurrent.BlockingQueue; public class AnalysisTaskExecutorTest extends TestWithFeService { - @Mocked - AnalysisTaskScheduler analysisTaskScheduler; @Override protected void runBeforeAll() throws Exception { @@ -71,13 +69,7 @@ public class AnalysisTaskExecutorTest extends TestWithFeService { .build(); OlapAnalysisTask analysisJob = new OlapAnalysisTask(analysisJobInfo); - new MockUp<AnalysisTaskScheduler>() { - public synchronized BaseAnalysisTask getPendingTasks() { - return analysisJob; - } - }; - - AnalysisTaskExecutor analysisTaskExecutor = new AnalysisTaskExecutor(analysisTaskScheduler); + AnalysisTaskExecutor analysisTaskExecutor = new AnalysisTaskExecutor(1); BlockingQueue<AnalysisTaskWrapper> b = Deencapsulation.getField(analysisTaskExecutor, "taskQueue"); AnalysisTaskWrapper analysisTaskWrapper = new AnalysisTaskWrapper(analysisTaskExecutor, analysisJob); Deencapsulation.setField(analysisTaskWrapper, "startTime", 5); @@ -102,7 +94,7 @@ public class AnalysisTaskExecutorTest extends TestWithFeService { } }; - AnalysisTaskExecutor analysisTaskExecutor = new AnalysisTaskExecutor(analysisTaskScheduler); + AnalysisTaskExecutor analysisTaskExecutor = new AnalysisTaskExecutor(1); HashMap<String, Set<String>> colToPartitions = Maps.newHashMap(); colToPartitions.put("col1", Collections.singleton("t1")); AnalysisInfo analysisInfo = new AnalysisInfoBuilder().setJobId(0).setTaskId(0) @@ -114,16 +106,16 @@ public class AnalysisTaskExecutorTest extends TestWithFeService { .setColToPartitions(colToPartitions) .build(); OlapAnalysisTask task = new OlapAnalysisTask(analysisInfo); - new MockUp<AnalysisTaskScheduler>() { - @Mock - public synchronized BaseAnalysisTask getPendingTasks() { - return task; - } - }; + new MockUp<AnalysisManager>() { @Mock public void updateTaskStatus(AnalysisInfo info, AnalysisState jobState, String message, long time) {} }; - Deencapsulation.invoke(analysisTaskExecutor, "doFetchAndExecute"); + new Expectations() { + { + task.doExecute(); + } + }; + Deencapsulation.invoke(analysisTaskExecutor, "submitTask", task); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/HistogramTaskTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/HistogramTaskTest.java index d3d5245a81..0660c994a1 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/HistogramTaskTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/HistogramTaskTest.java @@ -30,7 +30,6 @@ import org.apache.doris.utframe.TestWithFeService; import mockit.Mock; import mockit.MockUp; -import mockit.Mocked; import org.junit.FixMethodOrder; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -43,9 +42,6 @@ import java.util.concurrent.ConcurrentMap; @FixMethodOrder(value = MethodSorters.NAME_ASCENDING) public class HistogramTaskTest extends TestWithFeService { - @Mocked - AnalysisTaskScheduler analysisTaskScheduler; - @Override protected void runBeforeAll() throws Exception { createDatabase("histogram_task_test"); @@ -96,7 +92,7 @@ public class HistogramTaskTest extends TestWithFeService { @Test public void test2TaskExecution() throws Exception { - AnalysisTaskExecutor analysisTaskExecutor = new AnalysisTaskExecutor(analysisTaskScheduler); + AnalysisTaskExecutor analysisTaskExecutor = new AnalysisTaskExecutor(1); AnalysisInfo analysisInfo = new AnalysisInfoBuilder() .setJobId(0).setTaskId(0).setCatalogName("internal") .setDbName(SystemInfoService.DEFAULT_CLUSTER + ":" + "histogram_task_test").setTblName("t1") @@ -107,17 +103,11 @@ public class HistogramTaskTest extends TestWithFeService { .build(); HistogramTask task = new HistogramTask(analysisInfo); - new MockUp<AnalysisTaskScheduler>() { - @Mock - public synchronized BaseAnalysisTask getPendingTasks() { - return task; - } - }; new MockUp<AnalysisManager>() { @Mock public void updateTaskStatus(AnalysisInfo info, AnalysisState jobState, String message, long time) {} }; - Deencapsulation.invoke(analysisTaskExecutor, "doFetchAndExecute"); + Deencapsulation.invoke(analysisTaskExecutor, "submitTask", task); } } diff --git a/regression-test/pipeline/p0/conf/fe.conf b/regression-test/pipeline/p0/conf/fe.conf index 592bab5556..772ad23a9d 100644 --- a/regression-test/pipeline/p0/conf/fe.conf +++ b/regression-test/pipeline/p0/conf/fe.conf @@ -83,4 +83,4 @@ enable_mtmv = true dynamic_partition_check_interval_seconds=3 -enable_full_auto_analyze=false +enable_full_auto_analyze=true --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org