This is an automated email from the ASF dual-hosted git repository. morrysnow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new b72c71dec0 [fix](stats) Analysis jobs didn't get persisted properly (#18602) b72c71dec0 is described below commit b72c71dec06f47c1a4a4200eef3a873d6e6c7841 Author: AKIRA <33112463+kikyou1...@users.noreply.github.com> AuthorDate: Thu Apr 13 17:36:06 2023 +0900 [fix](stats) Analysis jobs didn't get persisted properly (#18602) In previous implementation, Doris would only persist one task to tract analysis job status. After this PR, each task of column analysis would be persisted.And store a record which task_id is -1 as the job of the user submitted AnalyzeStmt. AnalyzeStmt <---1-1---> AnalysisJob AnalysisJob <---1-n---> AnalysisTask --- .../main/java/org/apache/doris/catalog/Env.java | 2 +- .../doris/catalog/InternalSchemaInitializer.java | 4 +- .../apache/doris/statistics/AnalysisManager.java | 126 +++++++++++++-------- .../apache/doris/statistics/AnalysisTaskInfo.java | 4 + .../doris/statistics/StatisticsRepository.java | 4 +- 5 files changed, 90 insertions(+), 50 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 5bdf74f1fe..0c79634103 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -5350,7 +5350,7 @@ public class Env { // 1. handle partition level analysis statement properly // 2. support sample job // 3. support period job - public void createAnalysisJob(AnalyzeStmt analyzeStmt) { + public void createAnalysisJob(AnalyzeStmt analyzeStmt) throws DdlException { analysisManager.createAnalysisJob(analyzeStmt); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java index 6feec883a7..3f421d5a86 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java @@ -191,11 +191,11 @@ public class InternalSchemaInitializer extends Thread { columnDefs.add(new ColumnDef("schedule_type", TypeDef.createVarchar(32))); String engineName = "olap"; KeysDesc keysDesc = new KeysDesc(KeysType.UNIQUE_KEYS, - Lists.newArrayList("job_id")); + Lists.newArrayList("job_id", "task_id")); DistributionDesc distributionDesc = new HashDistributionDesc( StatisticConstants.STATISTIC_TABLE_BUCKET_COUNT, - Lists.newArrayList("job_id")); + Lists.newArrayList("job_id", "task_id")); Map<String, String> properties = new HashMap<String, String>() { { put("replication_num", String.valueOf(Config.statistic_internal_table_replica_num)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index 8eba9eb8fb..ee3c15eae4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -62,7 +62,7 @@ public class AnalysisManager { private static final String UPDATE_JOB_STATE_SQL_TEMPLATE = "UPDATE " + FeConstants.INTERNAL_DB_NAME + "." + StatisticConstants.ANALYSIS_JOB_TABLE + " " - + "SET state = '${jobState}' ${message} ${updateExecTime} WHERE job_id = ${jobId}"; + + "SET state = '${jobState}' ${message} ${updateExecTime} WHERE job_id = ${jobId} and task_id=${taskId}"; private static final String SHOW_JOB_STATE_SQL_TEMPLATE = "SELECT " + "job_id, catalog_name, db_name, tbl_name, col_name, job_type, " @@ -90,7 +90,8 @@ public class AnalysisManager { return statisticsCache; } - public void createAnalysisJob(AnalyzeStmt analyzeStmt) { + // Each analyze stmt corresponding to an analysis job. + public void createAnalysisJob(AnalyzeStmt analyzeStmt) throws DdlException { String catalogName = analyzeStmt.getCatalogName(); String db = analyzeStmt.getDBName(); TableName tbl = analyzeStmt.getTblName(); @@ -99,7 +100,6 @@ public class AnalysisManager { Set<String> partitionNames = analyzeStmt.getPartitionNames(); Map<Long, AnalysisTaskInfo> analysisTaskInfos = new HashMap<>(); long jobId = Env.getCurrentEnv().getNextId(); - // If the analysis is not incremental, need to delete existing statistics. // we cannot collect histograms incrementally and do not support it if (!analyzeStmt.isIncrement && !analyzeStmt.isHistogram) { @@ -112,58 +112,87 @@ public class AnalysisManager { StatisticsRepository.dropStatistics(dbId, tblIds, colNames, partIds); } - if (colNames != null) { - for (String colName : colNames) { + createTaskForEachColumns(analyzeStmt, catalogName, db, tbl, colNames, partitionNames, analysisTaskInfos, jobId); + createTaskForMVIdx(analyzeStmt, catalogName, db, tbl, partitionNames, analysisTaskInfos, jobId); + persistAnalysisJob(catalogName, db, tbl, jobId); + + if (analyzeStmt.isSync()) { + syncExecute(analysisTaskInfos.values()); + return; + } + + analysisJobIdToTaskMap.put(jobId, analysisTaskInfos); + analysisTaskInfos.values().forEach(taskScheduler::schedule); + } + + private void persistAnalysisJob(String catalogName, String db, TableName tbl, + long jobId) throws DdlException { + try { + AnalysisTaskInfo analysisTaskInfo = new AnalysisTaskInfoBuilder().setJobId( + jobId).setTaskId(-1) + .setCatalogName(catalogName).setDbName(db) + .setTblName(tbl.getTbl()) + .setJobType(JobType.MANUAL) + .setAnalysisMethod(AnalysisMethod.FULL).setAnalysisType(AnalysisType.INDEX) + .setScheduleType(ScheduleType.ONCE).build(); + StatisticsRepository.persistAnalysisTask(analysisTaskInfo); + } catch (Throwable t) { + throw new DdlException(t.getMessage(), t); + } + } + + private void createTaskForMVIdx(AnalyzeStmt analyzeStmt, String catalogName, String db, TableName tbl, + Set<String> partitionNames, Map<Long, AnalysisTaskInfo> analysisTaskInfos, long jobId) throws DdlException { + if (!(analyzeStmt.isWholeTbl && analyzeStmt.getTable().getType().equals(TableType.OLAP))) { + return; + } + OlapTable olapTable = (OlapTable) analyzeStmt.getTable(); + try { + olapTable.readLock(); + for (MaterializedIndexMeta meta : olapTable.getIndexIdToMeta().values()) { + if (meta.getDefineStmt() == null) { + continue; + } long taskId = Env.getCurrentEnv().getNextId(); - AnalysisType analType = analyzeStmt.isHistogram ? AnalysisType.HISTOGRAM : AnalysisType.COLUMN; - AnalysisTaskInfo analysisTaskInfo = new AnalysisTaskInfoBuilder().setJobId(jobId) - .setTaskId(taskId).setCatalogName(catalogName).setDbName(db) - .setTblName(tbl.getTbl()).setColName(colName) - .setPartitionNames(partitionNames).setJobType(JobType.MANUAL) - .setAnalysisMethod(AnalysisMethod.FULL).setAnalysisType(analType) - .setState(AnalysisState.PENDING) + AnalysisTaskInfo analysisTaskInfo = new AnalysisTaskInfoBuilder().setJobId( + jobId).setTaskId(taskId) + .setCatalogName(catalogName).setDbName(db) + .setTblName(tbl.getTbl()).setPartitionNames(partitionNames) + .setIndexId(meta.getIndexId()).setJobType(JobType.MANUAL) + .setAnalysisMethod(AnalysisMethod.FULL).setAnalysisType(AnalysisType.INDEX) .setScheduleType(ScheduleType.ONCE).build(); try { - StatisticsRepository.createAnalysisTask(analysisTaskInfo); + StatisticsRepository.persistAnalysisTask(analysisTaskInfo); } catch (Exception e) { - throw new RuntimeException("Failed to create analysis job", e); + throw new DdlException("Failed to create analysis task", e); } analysisTaskInfos.put(taskId, analysisTaskInfo); } + } finally { + olapTable.readUnlock(); } - if (analyzeStmt.isWholeTbl && analyzeStmt.getTable().getType().equals(TableType.OLAP)) { - OlapTable olapTable = (OlapTable) analyzeStmt.getTable(); + } + + private void createTaskForEachColumns(AnalyzeStmt analyzeStmt, String catalogName, String db, TableName tbl, + Set<String> colNames, Set<String> partitionNames, Map<Long, AnalysisTaskInfo> analysisTaskInfos, + long jobId) throws DdlException { + for (String colName : colNames) { + long taskId = Env.getCurrentEnv().getNextId(); + AnalysisType analType = analyzeStmt.isHistogram ? AnalysisType.HISTOGRAM : AnalysisType.COLUMN; + AnalysisTaskInfo analysisTaskInfo = new AnalysisTaskInfoBuilder().setJobId(jobId) + .setTaskId(taskId).setCatalogName(catalogName).setDbName(db) + .setTblName(tbl.getTbl()).setColName(colName) + .setPartitionNames(partitionNames).setJobType(JobType.MANUAL) + .setAnalysisMethod(AnalysisMethod.FULL).setAnalysisType(analType) + .setState(AnalysisState.PENDING) + .setScheduleType(ScheduleType.ONCE).build(); try { - olapTable.readLock(); - for (MaterializedIndexMeta meta : olapTable.getIndexIdToMeta().values()) { - if (meta.getDefineStmt() == null) { - continue; - } - long taskId = Env.getCurrentEnv().getNextId(); - AnalysisTaskInfo analysisTaskInfo = new AnalysisTaskInfoBuilder().setJobId( - jobId).setTaskId(taskId) - .setCatalogName(catalogName).setDbName(db) - .setTblName(tbl.getTbl()).setPartitionNames(partitionNames) - .setIndexId(meta.getIndexId()).setJobType(JobType.MANUAL) - .setAnalysisMethod(AnalysisMethod.FULL).setAnalysisType(AnalysisType.INDEX) - .setScheduleType(ScheduleType.ONCE).build(); - try { - StatisticsRepository.createAnalysisTask(analysisTaskInfo); - } catch (Exception e) { - throw new RuntimeException("Failed to create analysis job", e); - } - analysisTaskInfos.put(taskId, analysisTaskInfo); - } - } finally { - olapTable.readUnlock(); + StatisticsRepository.persistAnalysisTask(analysisTaskInfo); + } catch (Exception e) { + throw new DdlException("Failed to create analysis task", e); } + analysisTaskInfos.put(taskId, analysisTaskInfo); } - if (analyzeStmt.isSync()) { - syncExecute(analysisTaskInfos.values()); - return; - } - analysisJobIdToTaskMap.put(jobId, analysisTaskInfos); - analysisTaskInfos.values().forEach(taskScheduler::schedule); } public void updateTaskStatus(AnalysisTaskInfo info, AnalysisState jobState, String message, long time) { @@ -172,16 +201,23 @@ public class AnalysisManager { params.put("message", StringUtils.isNotEmpty(message) ? String.format(", message = '%s'", message) : ""); params.put("updateExecTime", time == -1 ? "" : ", last_exec_time_in_ms=" + time); params.put("jobId", String.valueOf(info.jobId)); + params.put("taskId", String.valueOf(info.taskId)); try { StatisticsUtil.execUpdate(new StringSubstitutor(params).replace(UPDATE_JOB_STATE_SQL_TEMPLATE)); } catch (Exception e) { - LOG.warn(String.format("Failed to update state for job: %s", info.jobId), e); + LOG.warn(String.format("Failed to update state for task: %d, %d", info.jobId, info.taskId), e); } finally { info.state = jobState; if (analysisJobIdToTaskMap.get(info.jobId).values() .stream().allMatch(i -> i.state != null && i.state != AnalysisState.PENDING && i.state != AnalysisState.RUNNING)) { analysisJobIdToTaskMap.remove(info.jobId); + params.put("taskId", String.valueOf(-1)); + try { + StatisticsUtil.execUpdate(new StringSubstitutor(params).replace(UPDATE_JOB_STATE_SQL_TEMPLATE)); + } catch (Exception e) { + LOG.warn(String.format("Failed to update state for job: %s", info.jobId), e); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfo.java index 9f5f608229..def16de41c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfo.java @@ -127,4 +127,8 @@ public class AnalysisTaskInfo { public AnalysisState getState() { return state; } + + public boolean isJob() { + return taskId == -1; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java index 9199c74b25..c18cb38d0a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java @@ -198,14 +198,14 @@ public class StatisticsRepository { predicate.append(partPredicate); } - public static void createAnalysisTask(AnalysisTaskInfo analysisTaskInfo) throws Exception { + public static void persistAnalysisTask(AnalysisTaskInfo analysisTaskInfo) throws Exception { Map<String, String> params = new HashMap<>(); params.put("jobId", String.valueOf(analysisTaskInfo.jobId)); params.put("taskId", String.valueOf(analysisTaskInfo.taskId)); params.put("catalogName", analysisTaskInfo.catalogName); params.put("dbName", analysisTaskInfo.dbName); params.put("tblName", analysisTaskInfo.tblName); - params.put("colName", analysisTaskInfo.colName); + params.put("colName", analysisTaskInfo.colName == null ? "" : analysisTaskInfo.colName); params.put("indexId", analysisTaskInfo.indexId == null ? "-1" : String.valueOf(analysisTaskInfo.indexId)); params.put("jobType", analysisTaskInfo.jobType.toString()); params.put("analysisType", analysisTaskInfo.analysisMethod.toString()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org