This is an automated email from the ASF dual-hosted git repository.

starocean999 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ec0cedab51 [opt](stats) Use single connect context for each olap 
analyze task
ec0cedab51 is described below

commit ec0cedab5174bf6bb3424e2796595f7d7619b160
Author: AKIRA <33112463+kikyou1...@users.noreply.github.com>
AuthorDate: Thu Aug 10 15:04:28 2023 +0800

    [opt](stats) Use single connect context for each olap analyze task
    
    1. add some comment
    2. Fix potential NPE caused by deleting a running analyze job
    3. Use single connect  context for each olap analyze task
---
 .../apache/doris/statistics/AnalysisManager.java   | 14 +++++--
 .../apache/doris/statistics/OlapAnalysisTask.java  | 44 ++++++++++------------
 .../doris/statistics/AnalysisTaskExecutorTest.java |  2 +-
 3 files changed, 31 insertions(+), 29 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index cb4d9eb034..b5c6ebf602 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -96,17 +96,23 @@ public class AnalysisManager extends Daemon implements 
Writable {
 
     private static final Logger LOG = 
LogManager.getLogger(AnalysisManager.class);
 
-    private ConcurrentMap<Long, Map<Long, BaseAnalysisTask>> 
analysisJobIdToTaskMap = new ConcurrentHashMap<>();
+    // Tracking running manually submitted async tasks, keep in mem only
+    private final ConcurrentMap<Long, Map<Long, BaseAnalysisTask>> 
analysisJobIdToTaskMap = new ConcurrentHashMap<>();
 
     private StatisticsCache statisticsCache;
 
     private AnalysisTaskExecutor taskExecutor;
 
+    // Store task information in metadata.
     private final Map<Long, AnalysisInfo> analysisTaskInfoMap = 
Collections.synchronizedMap(new TreeMap<>());
+
+    // Store job information in metadata
     private final Map<Long, AnalysisInfo> analysisJobInfoMap = 
Collections.synchronizedMap(new TreeMap<>());
 
+    // Tracking system submitted job, keep in mem only
     private final Map<Long, AnalysisInfo> systemJobInfoMap = new 
ConcurrentHashMap<>();
 
+    // Tracking and control sync analyze tasks, keep in mem only
     private final ConcurrentMap<ConnectContext, SyncTaskCollection> 
ctxToSyncTask = new ConcurrentHashMap<>();
 
     private final Function<TaskStatusWrapper, Void> userJobStatusUpdater = w 
-> {
@@ -127,6 +133,10 @@ public class AnalysisManager extends Daemon implements 
Writable {
         }
         info.lastExecTimeInMs = time;
         AnalysisInfo job = analysisJobInfoMap.get(info.jobId);
+        // Job may get deleted during execution.
+        if (job == null) {
+            return null;
+        }
         // Synchronize the job state change in job level.
         synchronized (job) {
             job.lastExecTimeInMs = time;
@@ -333,8 +343,6 @@ public class AnalysisManager extends Daemon implements 
Writable {
         if (!isSync) {
             persistAnalysisJob(jobInfo);
             analysisJobIdToTaskMap.put(jobInfo.jobId, analysisTaskInfos);
-        }
-        if (!isSync) {
             try {
                 updateTableStats(jobInfo);
             } catch (Throwable e) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java
index 257708de54..71b1191565 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java
@@ -67,7 +67,7 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
         params.put("colName", String.valueOf(info.colName));
         params.put("tblName", String.valueOf(info.tblName));
         params.put("sampleExpr", getSampleExpression());
-        List<String> partitionAnalysisSQLs = new ArrayList<>();
+        List<String> sqls = new ArrayList<>();
         try {
             tbl.readLock();
             Set<String> partNames = info.colToPartitions.get(info.colName);
@@ -80,46 +80,40 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
                 // Avoid error when get the default partition
                 params.put("partName", "`" + partName + "`");
                 StringSubstitutor stringSubstitutor = new 
StringSubstitutor(params);
-                
partitionAnalysisSQLs.add(stringSubstitutor.replace(ANALYZE_PARTITION_SQL_TEMPLATE));
+                
sqls.add(stringSubstitutor.replace(ANALYZE_PARTITION_SQL_TEMPLATE));
             }
         } finally {
             tbl.readUnlock();
         }
-        execSQLs(partitionAnalysisSQLs);
         params.remove("partId");
         params.put("type", col.getType().toString());
         StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
         String sql = stringSubstitutor.replace(ANALYZE_COLUMN_SQL_TEMPLATE);
-        execSQL(sql);
+        sqls.add(sql);
+        execSQLs(sqls);
     }
 
     @VisibleForTesting
-    public void execSQLs(List<String> partitionAnalysisSQLs) throws Exception {
-        for (String sql : partitionAnalysisSQLs) {
-            execSQL(sql);
-        }
-    }
-
-    @VisibleForTesting
-    public void execSQL(String sql) throws Exception {
-        if (killed) {
-            return;
-        }
+    public void execSQLs(List<String> sqls) throws Exception {
         long startTime = System.currentTimeMillis();
-        LOG.info("ANALYZE SQL : " + sql + " start at " + startTime);
         try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) 
{
             r.connectContext.getSessionVariable().disableNereidsPlannerOnce();
-            stmtExecutor = new StmtExecutor(r.connectContext, sql);
-            r.connectContext.setExecutor(stmtExecutor);
-            stmtExecutor.execute();
-            QueryState queryState = r.connectContext.getState();
-            if (queryState.getStateType().equals(MysqlStateType.ERR)) {
-                throw new RuntimeException(String.format("Failed to analyze 
%s.%s.%s, error: %s sql: %s",
-                        info.catalogName, info.dbName, info.colName, sql, 
queryState.getErrorMessage()));
+            for (String sql : sqls) {
+                if (killed) {
+                    return;
+                }
+                LOG.info("ANALYZE SQL : " + sql + " start at " + startTime);
+                stmtExecutor = new StmtExecutor(r.connectContext, sql);
+                r.connectContext.setExecutor(stmtExecutor);
+                stmtExecutor.execute();
+                QueryState queryState = r.connectContext.getState();
+                if (queryState.getStateType().equals(MysqlStateType.ERR)) {
+                    throw new RuntimeException(String.format("Failed to 
analyze %s.%s.%s, error: %s sql: %s",
+                            info.catalogName, info.dbName, info.colName, sql, 
queryState.getErrorMessage()));
+                }
             }
         } finally {
-            LOG.info("Analyze SQL: " + sql + " cost time: " + 
(System.currentTimeMillis() - startTime) + "ms");
+            LOG.debug("Analyze SQL: " + sqls + " cost time: " + 
(System.currentTimeMillis() - startTime) + "ms");
         }
     }
-
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java
index 7bbaf9b902..574c96c73d 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java
@@ -89,7 +89,7 @@ public class AnalysisTaskExecutorTest extends 
TestWithFeService {
 
         new MockUp<OlapAnalysisTask>() {
             @Mock
-            public void execSQL(String sql) throws Exception {
+            public void execSQLs(List<String> sqls) throws Exception {
             }
         };
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to