This is an automated email from the ASF dual-hosted git repository. starocean999 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new ec0cedab51 [opt](stats) Use single connect context for each olap analyze task ec0cedab51 is described below commit ec0cedab5174bf6bb3424e2796595f7d7619b160 Author: AKIRA <33112463+kikyou1...@users.noreply.github.com> AuthorDate: Thu Aug 10 15:04:28 2023 +0800 [opt](stats) Use single connect context for each olap analyze task 1. add some comment 2. Fix potential NPE caused by deleting a running analyze job 3. Use single connect context for each olap analyze task --- .../apache/doris/statistics/AnalysisManager.java | 14 +++++-- .../apache/doris/statistics/OlapAnalysisTask.java | 44 ++++++++++------------ .../doris/statistics/AnalysisTaskExecutorTest.java | 2 +- 3 files changed, 31 insertions(+), 29 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index cb4d9eb034..b5c6ebf602 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -96,17 +96,23 @@ public class AnalysisManager extends Daemon implements Writable { private static final Logger LOG = LogManager.getLogger(AnalysisManager.class); - private ConcurrentMap<Long, Map<Long, BaseAnalysisTask>> analysisJobIdToTaskMap = new ConcurrentHashMap<>(); + // Tracking running manually submitted async tasks, keep in mem only + private final ConcurrentMap<Long, Map<Long, BaseAnalysisTask>> analysisJobIdToTaskMap = new ConcurrentHashMap<>(); private StatisticsCache statisticsCache; private AnalysisTaskExecutor taskExecutor; + // Store task information in metadata. private final Map<Long, AnalysisInfo> analysisTaskInfoMap = Collections.synchronizedMap(new TreeMap<>()); + + // Store job information in metadata private final Map<Long, AnalysisInfo> analysisJobInfoMap = Collections.synchronizedMap(new TreeMap<>()); + // Tracking system submitted job, keep in mem only private final Map<Long, AnalysisInfo> systemJobInfoMap = new ConcurrentHashMap<>(); + // Tracking and control sync analyze tasks, keep in mem only private final ConcurrentMap<ConnectContext, SyncTaskCollection> ctxToSyncTask = new ConcurrentHashMap<>(); private final Function<TaskStatusWrapper, Void> userJobStatusUpdater = w -> { @@ -127,6 +133,10 @@ public class AnalysisManager extends Daemon implements Writable { } info.lastExecTimeInMs = time; AnalysisInfo job = analysisJobInfoMap.get(info.jobId); + // Job may get deleted during execution. + if (job == null) { + return null; + } // Synchronize the job state change in job level. synchronized (job) { job.lastExecTimeInMs = time; @@ -333,8 +343,6 @@ public class AnalysisManager extends Daemon implements Writable { if (!isSync) { persistAnalysisJob(jobInfo); analysisJobIdToTaskMap.put(jobInfo.jobId, analysisTaskInfos); - } - if (!isSync) { try { updateTableStats(jobInfo); } catch (Throwable e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java index 257708de54..71b1191565 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java @@ -67,7 +67,7 @@ public class OlapAnalysisTask extends BaseAnalysisTask { params.put("colName", String.valueOf(info.colName)); params.put("tblName", String.valueOf(info.tblName)); params.put("sampleExpr", getSampleExpression()); - List<String> partitionAnalysisSQLs = new ArrayList<>(); + List<String> sqls = new ArrayList<>(); try { tbl.readLock(); Set<String> partNames = info.colToPartitions.get(info.colName); @@ -80,46 +80,40 @@ public class OlapAnalysisTask extends BaseAnalysisTask { // Avoid error when get the default partition params.put("partName", "`" + partName + "`"); StringSubstitutor stringSubstitutor = new StringSubstitutor(params); - partitionAnalysisSQLs.add(stringSubstitutor.replace(ANALYZE_PARTITION_SQL_TEMPLATE)); + sqls.add(stringSubstitutor.replace(ANALYZE_PARTITION_SQL_TEMPLATE)); } } finally { tbl.readUnlock(); } - execSQLs(partitionAnalysisSQLs); params.remove("partId"); params.put("type", col.getType().toString()); StringSubstitutor stringSubstitutor = new StringSubstitutor(params); String sql = stringSubstitutor.replace(ANALYZE_COLUMN_SQL_TEMPLATE); - execSQL(sql); + sqls.add(sql); + execSQLs(sqls); } @VisibleForTesting - public void execSQLs(List<String> partitionAnalysisSQLs) throws Exception { - for (String sql : partitionAnalysisSQLs) { - execSQL(sql); - } - } - - @VisibleForTesting - public void execSQL(String sql) throws Exception { - if (killed) { - return; - } + public void execSQLs(List<String> sqls) throws Exception { long startTime = System.currentTimeMillis(); - LOG.info("ANALYZE SQL : " + sql + " start at " + startTime); try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) { r.connectContext.getSessionVariable().disableNereidsPlannerOnce(); - stmtExecutor = new StmtExecutor(r.connectContext, sql); - r.connectContext.setExecutor(stmtExecutor); - stmtExecutor.execute(); - QueryState queryState = r.connectContext.getState(); - if (queryState.getStateType().equals(MysqlStateType.ERR)) { - throw new RuntimeException(String.format("Failed to analyze %s.%s.%s, error: %s sql: %s", - info.catalogName, info.dbName, info.colName, sql, queryState.getErrorMessage())); + for (String sql : sqls) { + if (killed) { + return; + } + LOG.info("ANALYZE SQL : " + sql + " start at " + startTime); + stmtExecutor = new StmtExecutor(r.connectContext, sql); + r.connectContext.setExecutor(stmtExecutor); + stmtExecutor.execute(); + QueryState queryState = r.connectContext.getState(); + if (queryState.getStateType().equals(MysqlStateType.ERR)) { + throw new RuntimeException(String.format("Failed to analyze %s.%s.%s, error: %s sql: %s", + info.catalogName, info.dbName, info.colName, sql, queryState.getErrorMessage())); + } } } finally { - LOG.info("Analyze SQL: " + sql + " cost time: " + (System.currentTimeMillis() - startTime) + "ms"); + LOG.debug("Analyze SQL: " + sqls + " cost time: " + (System.currentTimeMillis() - startTime) + "ms"); } } - } diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java index 7bbaf9b902..574c96c73d 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java @@ -89,7 +89,7 @@ public class AnalysisTaskExecutorTest extends TestWithFeService { new MockUp<OlapAnalysisTask>() { @Mock - public void execSQL(String sql) throws Exception { + public void execSQLs(List<String> sqls) throws Exception { } }; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org