This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b72c71dec0 [fix](stats) Analysis jobs didn't get persisted properly 
(#18602)
b72c71dec0 is described below

commit b72c71dec06f47c1a4a4200eef3a873d6e6c7841
Author: AKIRA <33112463+kikyou1...@users.noreply.github.com>
AuthorDate: Thu Apr 13 17:36:06 2023 +0900

    [fix](stats) Analysis jobs didn't get persisted properly (#18602)
    
    In previous implementation, Doris would only persist one task to tract 
analysis job status. After this PR, each task of column analysis would be 
persisted.And store a record which task_id is -1 as the job of the user 
submitted AnalyzeStmt.
    
    AnalyzeStmt <---1-1---> AnalysisJob
    AnalysisJob <---1-n---> AnalysisTask
---
 .../main/java/org/apache/doris/catalog/Env.java    |   2 +-
 .../doris/catalog/InternalSchemaInitializer.java   |   4 +-
 .../apache/doris/statistics/AnalysisManager.java   | 126 +++++++++++++--------
 .../apache/doris/statistics/AnalysisTaskInfo.java  |   4 +
 .../doris/statistics/StatisticsRepository.java     |   4 +-
 5 files changed, 90 insertions(+), 50 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 5bdf74f1fe..0c79634103 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -5350,7 +5350,7 @@ public class Env {
     //  1. handle partition level analysis statement properly
     //  2. support sample job
     //  3. support period job
-    public void createAnalysisJob(AnalyzeStmt analyzeStmt) {
+    public void createAnalysisJob(AnalyzeStmt analyzeStmt) throws DdlException 
{
         analysisManager.createAnalysisJob(analyzeStmt);
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java
index 6feec883a7..3f421d5a86 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java
@@ -191,11 +191,11 @@ public class InternalSchemaInitializer extends Thread {
         columnDefs.add(new ColumnDef("schedule_type", 
TypeDef.createVarchar(32)));
         String engineName = "olap";
         KeysDesc keysDesc = new KeysDesc(KeysType.UNIQUE_KEYS,
-                Lists.newArrayList("job_id"));
+                Lists.newArrayList("job_id", "task_id"));
 
         DistributionDesc distributionDesc = new HashDistributionDesc(
                 StatisticConstants.STATISTIC_TABLE_BUCKET_COUNT,
-                Lists.newArrayList("job_id"));
+                Lists.newArrayList("job_id", "task_id"));
         Map<String, String> properties = new HashMap<String, String>() {
             {
                 put("replication_num", 
String.valueOf(Config.statistic_internal_table_replica_num));
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index 8eba9eb8fb..ee3c15eae4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -62,7 +62,7 @@ public class AnalysisManager {
 
     private static final String UPDATE_JOB_STATE_SQL_TEMPLATE = "UPDATE "
             + FeConstants.INTERNAL_DB_NAME + "." + 
StatisticConstants.ANALYSIS_JOB_TABLE + " "
-            + "SET state = '${jobState}' ${message} ${updateExecTime} WHERE 
job_id = ${jobId}";
+            + "SET state = '${jobState}' ${message} ${updateExecTime} WHERE 
job_id = ${jobId} and task_id=${taskId}";
 
     private static final String SHOW_JOB_STATE_SQL_TEMPLATE = "SELECT "
             + "job_id, catalog_name, db_name, tbl_name, col_name, job_type, "
@@ -90,7 +90,8 @@ public class AnalysisManager {
         return statisticsCache;
     }
 
-    public void createAnalysisJob(AnalyzeStmt analyzeStmt) {
+    // Each analyze stmt corresponding to an analysis job.
+    public void createAnalysisJob(AnalyzeStmt analyzeStmt) throws DdlException 
{
         String catalogName = analyzeStmt.getCatalogName();
         String db = analyzeStmt.getDBName();
         TableName tbl = analyzeStmt.getTblName();
@@ -99,7 +100,6 @@ public class AnalysisManager {
         Set<String> partitionNames = analyzeStmt.getPartitionNames();
         Map<Long, AnalysisTaskInfo> analysisTaskInfos = new HashMap<>();
         long jobId = Env.getCurrentEnv().getNextId();
-
         // If the analysis is not incremental, need to delete existing 
statistics.
         // we cannot collect histograms incrementally and do not support it
         if (!analyzeStmt.isIncrement && !analyzeStmt.isHistogram) {
@@ -112,58 +112,87 @@ public class AnalysisManager {
             StatisticsRepository.dropStatistics(dbId, tblIds, colNames, 
partIds);
         }
 
-        if (colNames != null) {
-            for (String colName : colNames) {
+        createTaskForEachColumns(analyzeStmt, catalogName, db, tbl, colNames, 
partitionNames, analysisTaskInfos, jobId);
+        createTaskForMVIdx(analyzeStmt, catalogName, db, tbl, partitionNames, 
analysisTaskInfos, jobId);
+        persistAnalysisJob(catalogName, db, tbl, jobId);
+
+        if (analyzeStmt.isSync()) {
+            syncExecute(analysisTaskInfos.values());
+            return;
+        }
+
+        analysisJobIdToTaskMap.put(jobId, analysisTaskInfos);
+        analysisTaskInfos.values().forEach(taskScheduler::schedule);
+    }
+
+    private void persistAnalysisJob(String catalogName, String db, TableName 
tbl,
+            long jobId) throws DdlException {
+        try {
+            AnalysisTaskInfo analysisTaskInfo = new 
AnalysisTaskInfoBuilder().setJobId(
+                            jobId).setTaskId(-1)
+                    .setCatalogName(catalogName).setDbName(db)
+                    .setTblName(tbl.getTbl())
+                    .setJobType(JobType.MANUAL)
+                    
.setAnalysisMethod(AnalysisMethod.FULL).setAnalysisType(AnalysisType.INDEX)
+                    .setScheduleType(ScheduleType.ONCE).build();
+            StatisticsRepository.persistAnalysisTask(analysisTaskInfo);
+        } catch (Throwable t) {
+            throw new DdlException(t.getMessage(), t);
+        }
+    }
+
+    private void createTaskForMVIdx(AnalyzeStmt analyzeStmt, String 
catalogName, String db, TableName tbl,
+            Set<String> partitionNames, Map<Long, AnalysisTaskInfo> 
analysisTaskInfos, long jobId) throws DdlException {
+        if (!(analyzeStmt.isWholeTbl && 
analyzeStmt.getTable().getType().equals(TableType.OLAP))) {
+            return;
+        }
+        OlapTable olapTable = (OlapTable) analyzeStmt.getTable();
+        try {
+            olapTable.readLock();
+            for (MaterializedIndexMeta meta : 
olapTable.getIndexIdToMeta().values()) {
+                if (meta.getDefineStmt() == null) {
+                    continue;
+                }
                 long taskId = Env.getCurrentEnv().getNextId();
-                AnalysisType analType = analyzeStmt.isHistogram ? 
AnalysisType.HISTOGRAM : AnalysisType.COLUMN;
-                AnalysisTaskInfo analysisTaskInfo = new 
AnalysisTaskInfoBuilder().setJobId(jobId)
-                        
.setTaskId(taskId).setCatalogName(catalogName).setDbName(db)
-                        .setTblName(tbl.getTbl()).setColName(colName)
-                        
.setPartitionNames(partitionNames).setJobType(JobType.MANUAL)
-                        
.setAnalysisMethod(AnalysisMethod.FULL).setAnalysisType(analType)
-                        .setState(AnalysisState.PENDING)
+                AnalysisTaskInfo analysisTaskInfo = new 
AnalysisTaskInfoBuilder().setJobId(
+                                jobId).setTaskId(taskId)
+                        .setCatalogName(catalogName).setDbName(db)
+                        
.setTblName(tbl.getTbl()).setPartitionNames(partitionNames)
+                        
.setIndexId(meta.getIndexId()).setJobType(JobType.MANUAL)
+                        
.setAnalysisMethod(AnalysisMethod.FULL).setAnalysisType(AnalysisType.INDEX)
                         .setScheduleType(ScheduleType.ONCE).build();
                 try {
-                    StatisticsRepository.createAnalysisTask(analysisTaskInfo);
+                    StatisticsRepository.persistAnalysisTask(analysisTaskInfo);
                 } catch (Exception e) {
-                    throw new RuntimeException("Failed to create analysis 
job", e);
+                    throw new DdlException("Failed to create analysis task", 
e);
                 }
                 analysisTaskInfos.put(taskId, analysisTaskInfo);
             }
+        } finally {
+            olapTable.readUnlock();
         }
-        if (analyzeStmt.isWholeTbl && 
analyzeStmt.getTable().getType().equals(TableType.OLAP)) {
-            OlapTable olapTable = (OlapTable) analyzeStmt.getTable();
+    }
+
+    private void createTaskForEachColumns(AnalyzeStmt analyzeStmt, String 
catalogName, String db, TableName tbl,
+            Set<String> colNames, Set<String> partitionNames, Map<Long, 
AnalysisTaskInfo> analysisTaskInfos,
+            long jobId) throws DdlException {
+        for (String colName : colNames) {
+            long taskId = Env.getCurrentEnv().getNextId();
+            AnalysisType analType = analyzeStmt.isHistogram ? 
AnalysisType.HISTOGRAM : AnalysisType.COLUMN;
+            AnalysisTaskInfo analysisTaskInfo = new 
AnalysisTaskInfoBuilder().setJobId(jobId)
+                    
.setTaskId(taskId).setCatalogName(catalogName).setDbName(db)
+                    .setTblName(tbl.getTbl()).setColName(colName)
+                    
.setPartitionNames(partitionNames).setJobType(JobType.MANUAL)
+                    
.setAnalysisMethod(AnalysisMethod.FULL).setAnalysisType(analType)
+                    .setState(AnalysisState.PENDING)
+                    .setScheduleType(ScheduleType.ONCE).build();
             try {
-                olapTable.readLock();
-                for (MaterializedIndexMeta meta : 
olapTable.getIndexIdToMeta().values()) {
-                    if (meta.getDefineStmt() == null) {
-                        continue;
-                    }
-                    long taskId = Env.getCurrentEnv().getNextId();
-                    AnalysisTaskInfo analysisTaskInfo = new 
AnalysisTaskInfoBuilder().setJobId(
-                                    jobId).setTaskId(taskId)
-                            .setCatalogName(catalogName).setDbName(db)
-                            
.setTblName(tbl.getTbl()).setPartitionNames(partitionNames)
-                            
.setIndexId(meta.getIndexId()).setJobType(JobType.MANUAL)
-                            
.setAnalysisMethod(AnalysisMethod.FULL).setAnalysisType(AnalysisType.INDEX)
-                            .setScheduleType(ScheduleType.ONCE).build();
-                    try {
-                        
StatisticsRepository.createAnalysisTask(analysisTaskInfo);
-                    } catch (Exception e) {
-                        throw new RuntimeException("Failed to create analysis 
job", e);
-                    }
-                    analysisTaskInfos.put(taskId, analysisTaskInfo);
-                }
-            } finally {
-                olapTable.readUnlock();
+                StatisticsRepository.persistAnalysisTask(analysisTaskInfo);
+            } catch (Exception e) {
+                throw new DdlException("Failed to create analysis task", e);
             }
+            analysisTaskInfos.put(taskId, analysisTaskInfo);
         }
-        if (analyzeStmt.isSync()) {
-            syncExecute(analysisTaskInfos.values());
-            return;
-        }
-        analysisJobIdToTaskMap.put(jobId, analysisTaskInfos);
-        analysisTaskInfos.values().forEach(taskScheduler::schedule);
     }
 
     public void updateTaskStatus(AnalysisTaskInfo info, AnalysisState 
jobState, String message, long time) {
@@ -172,16 +201,23 @@ public class AnalysisManager {
         params.put("message", StringUtils.isNotEmpty(message) ? 
String.format(", message = '%s'", message) : "");
         params.put("updateExecTime", time == -1 ? "" : ", 
last_exec_time_in_ms=" + time);
         params.put("jobId", String.valueOf(info.jobId));
+        params.put("taskId", String.valueOf(info.taskId));
         try {
             StatisticsUtil.execUpdate(new 
StringSubstitutor(params).replace(UPDATE_JOB_STATE_SQL_TEMPLATE));
         } catch (Exception e) {
-            LOG.warn(String.format("Failed to update state for job: %s", 
info.jobId), e);
+            LOG.warn(String.format("Failed to update state for task: %d, %d", 
info.jobId, info.taskId), e);
         } finally {
             info.state = jobState;
             if (analysisJobIdToTaskMap.get(info.jobId).values()
                     .stream().allMatch(i -> i.state != null
                             && i.state != AnalysisState.PENDING && i.state != 
AnalysisState.RUNNING)) {
                 analysisJobIdToTaskMap.remove(info.jobId);
+                params.put("taskId", String.valueOf(-1));
+                try {
+                    StatisticsUtil.execUpdate(new 
StringSubstitutor(params).replace(UPDATE_JOB_STATE_SQL_TEMPLATE));
+                } catch (Exception e) {
+                    LOG.warn(String.format("Failed to update state for job: 
%s", info.jobId), e);
+                }
             }
 
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfo.java
index 9f5f608229..def16de41c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfo.java
@@ -127,4 +127,8 @@ public class AnalysisTaskInfo {
     public AnalysisState getState() {
         return state;
     }
+
+    public boolean isJob() {
+        return taskId == -1;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java
 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java
index 9199c74b25..c18cb38d0a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java
@@ -198,14 +198,14 @@ public class StatisticsRepository {
         predicate.append(partPredicate);
     }
 
-    public static void createAnalysisTask(AnalysisTaskInfo analysisTaskInfo) 
throws Exception {
+    public static void persistAnalysisTask(AnalysisTaskInfo analysisTaskInfo) 
throws Exception {
         Map<String, String> params = new HashMap<>();
         params.put("jobId", String.valueOf(analysisTaskInfo.jobId));
         params.put("taskId", String.valueOf(analysisTaskInfo.taskId));
         params.put("catalogName", analysisTaskInfo.catalogName);
         params.put("dbName", analysisTaskInfo.dbName);
         params.put("tblName", analysisTaskInfo.tblName);
-        params.put("colName", analysisTaskInfo.colName);
+        params.put("colName", analysisTaskInfo.colName == null ? "" : 
analysisTaskInfo.colName);
         params.put("indexId", analysisTaskInfo.indexId == null ? "-1" : 
String.valueOf(analysisTaskInfo.indexId));
         params.put("jobType", analysisTaskInfo.jobType.toString());
         params.put("analysisType", analysisTaskInfo.analysisMethod.toString());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to