This is an automated email from the ASF dual-hosted git repository.

lijibing pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new ee733c0ec44 branch-3.0: [improvement](statistics)Async drop table 
stats while doing truncate and schema change. #45923 (#45997)
ee733c0ec44 is described below

commit ee733c0ec44035931a278fbc5570d05efa1e4f52
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Dec 26 16:10:15 2024 +0800

    branch-3.0: [improvement](statistics)Async drop table stats while doing 
truncate and schema change. #45923 (#45997)
    
    Cherry-picked from #45923
    
    Co-authored-by: James <lijib...@selectdb.com>
---
 .../apache/doris/service/FrontendServiceImpl.java  |  6 +-
 .../apache/doris/statistics/AnalysisManager.java   | 64 +++++++++++++---------
 .../doris/statistics/AnalysisTaskExecutor.java     |  6 +-
 .../doris/statistics/StatisticsAutoCollector.java  |  2 +-
 .../doris/statistics/AnalysisManagerTest.java      |  8 +--
 .../suites/statistics/analyze_stats.groovy         | 20 +++++++
 .../suites/statistics/test_analyze_mv.groovy       | 17 ++++++
 .../statistics/test_drop_stats_and_truncate.groovy | 18 ++++++
 8 files changed, 105 insertions(+), 36 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 8323540f5d1..f4f10bf331d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -114,6 +114,7 @@ import org.apache.doris.statistics.StatisticsCacheKey;
 import org.apache.doris.statistics.TableStatsMeta;
 import org.apache.doris.statistics.UpdatePartitionStatsTarget;
 import org.apache.doris.statistics.query.QueryStats;
+import org.apache.doris.statistics.util.StatisticsUtil;
 import org.apache.doris.system.Backend;
 import org.apache.doris.system.Frontend;
 import org.apache.doris.system.SystemInfoService;
@@ -3399,8 +3400,9 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             partitionNames = new PartitionNames(false, new 
ArrayList<>(target.partitions));
         }
         if (target.isTruncate) {
-            analysisManager.submitAsyncDropStatsTask(target.catalogId, 
target.dbId,
-                    target.tableId, tableStats, partitionNames);
+            TableIf table = StatisticsUtil.findTable(target.catalogId, 
target.dbId, target.tableId);
+            analysisManager.submitAsyncDropStatsTask(table, target.catalogId, 
target.dbId,
+                    target.tableId, tableStats, partitionNames, false);
         } else {
             analysisManager.invalidateLocalStats(target.catalogId, 
target.dbId, target.tableId,
                     target.columns, tableStats, partitionNames);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index ece9daf2520..d92179cf2f0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -159,12 +159,12 @@ public class AnalysisManager implements Writable {
     public AnalysisManager() {
         if (!Env.isCheckpointThread()) {
             this.taskExecutor = new 
AnalysisTaskExecutor(Config.statistics_simultaneously_running_task_num,
-                    Integer.MAX_VALUE);
+                    Integer.MAX_VALUE, "Manual Analysis Job Executor");
             this.statisticsCache = new StatisticsCache();
             this.dropStatsExecutors = ThreadPoolManager.newDaemonThreadPool(
-                1, 1, 0,
-                TimeUnit.DAYS, new LinkedBlockingQueue<>(10),
-                new ThreadPoolExecutor.AbortPolicy(),
+                1, 3, 10,
+                TimeUnit.DAYS, new LinkedBlockingQueue<>(20),
+                new ThreadPoolExecutor.DiscardPolicy(),
                 "Drop stats executor", true);
         }
     }
@@ -696,20 +696,7 @@ public class AnalysisManager implements Writable {
             long catalogId = table.getDatabase().getCatalog().getId();
             long dbId = table.getDatabase().getId();
             long tableId = table.getId();
-            if (!table.isPartitionedTable() || partitionNames == null
-                    || partitionNames.isStar() || 
partitionNames.getPartitionNames() == null) {
-                removeTableStats(tableId);
-                Env.getCurrentEnv().getEditLog().logDeleteTableStats(new 
TableStatsDeletionLog(tableId));
-            }
-            submitAsyncDropStatsTask(catalogId, dbId, tableId, tableStats, 
partitionNames);
-            // Drop stats ddl is master only operation.
-            Set<String> partitions = null;
-            if (partitionNames != null && !partitionNames.isStar() && 
partitionNames.getPartitionNames() != null) {
-                partitions = new HashSet<>(partitionNames.getPartitionNames());
-            }
-            // Drop stats ddl is master only operation.
-            invalidateRemoteStats(catalogId, dbId, tableId, null, partitions, 
true);
-            StatisticsRepository.dropStatistics(catalogId, dbId, 
table.getId(), null, partitions);
+            submitAsyncDropStatsTask(table, catalogId, dbId, tableId, 
tableStats, partitionNames, true);
         } catch (Throwable e) {
             LOG.warn("Failed to drop stats for table {}", table.getName(), e);
         }
@@ -722,30 +709,55 @@ public class AnalysisManager implements Writable {
         private final Set<String> columns;
         private final TableStatsMeta tableStats;
         private final PartitionNames partitionNames;
+        private final TableIf table;
+        private final boolean isMaster;
 
-        public DropStatsTask(long catalogId, long dbId, long tableId, 
Set<String> columns,
-                             TableStatsMeta tableStats, PartitionNames 
partitionNames) {
+        public DropStatsTask(TableIf table, long catalogId, long dbId, long 
tableId, Set<String> columns,
+                             TableStatsMeta tableStats, PartitionNames 
partitionNames, boolean isMaster) {
             this.catalogId = catalogId;
             this.dbId = dbId;
             this.tableId = tableId;
             this.columns = columns;
             this.tableStats = tableStats;
             this.partitionNames = partitionNames;
+            this.table = table;
+            this.isMaster = isMaster;
         }
 
         @Override
         public void run() {
-            invalidateLocalStats(catalogId, dbId, tableId, columns, 
tableStats, partitionNames);
+            try {
+                if (isMaster) {
+                    if (!table.isPartitionedTable() || partitionNames == null
+                            || partitionNames.isStar() || 
partitionNames.getPartitionNames() == null) {
+                        removeTableStats(tableId);
+                        
Env.getCurrentEnv().getEditLog().logDeleteTableStats(new 
TableStatsDeletionLog(tableId));
+                    }
+                    // Drop stats ddl is master only operation.
+                    Set<String> partitions = null;
+                    if (partitionNames != null && !partitionNames.isStar()
+                            && partitionNames.getPartitionNames() != null) {
+                        partitions = new 
HashSet<>(partitionNames.getPartitionNames());
+                    }
+                    // Drop stats ddl is master only operation.
+                    StatisticsRepository.dropStatistics(catalogId, dbId, 
tableId, null, partitions);
+                    invalidateRemoteStats(catalogId, dbId, tableId, null, 
partitions, true);
+                }
+                invalidateLocalStats(catalogId, dbId, tableId, columns, 
tableStats, partitionNames);
+            } catch (Throwable t) {
+                LOG.info("Failed to async drop stats for table {}.{}.{}, 
reason: {}",
+                        catalogId, dbId, tableId, t.getMessage());
+            }
         }
     }
 
-    public void submitAsyncDropStatsTask(long catalogId, long dbId, long 
tableId,
-                                         TableStatsMeta tableStats, 
PartitionNames partitionNames) {
+    public void submitAsyncDropStatsTask(TableIf table, long catalogId, long 
dbId, long tableId,
+                                         TableStatsMeta tableStats, 
PartitionNames partitionNames, boolean isMaster) {
         try {
-            dropStatsExecutors.submit(new DropStatsTask(catalogId, dbId, 
tableId, null, tableStats, partitionNames));
+            dropStatsExecutors.submit(new DropStatsTask(table, catalogId, 
dbId, tableId, null,
+                    tableStats, partitionNames, isMaster));
         } catch (Throwable t) {
-            LOG.info("Failed to drop stats for truncate table {}.{}.{}. 
Reason:{}",
-                    catalogId, dbId, tableId, t.getMessage());
+            LOG.info("Failed to submit async drop stats job. reason: {}", 
t.getMessage());
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
index 5b87608ba51..bc1126e9c51 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
@@ -44,17 +44,17 @@ public class AnalysisTaskExecutor {
                     
Comparator.comparingLong(AnalysisTaskWrapper::getStartTime));
 
     public AnalysisTaskExecutor(int simultaneouslyRunningTaskNum) {
-        this(simultaneouslyRunningTaskNum, Integer.MAX_VALUE);
+        this(simultaneouslyRunningTaskNum, Integer.MAX_VALUE, "Analysis Job 
Executor");
     }
 
-    public AnalysisTaskExecutor(int simultaneouslyRunningTaskNum, int 
taskQueueSize) {
+    public AnalysisTaskExecutor(int simultaneouslyRunningTaskNum, int 
taskQueueSize, String poolName) {
         if (!Env.isCheckpointThread()) {
             executors = ThreadPoolManager.newDaemonThreadPool(
                     simultaneouslyRunningTaskNum,
                     simultaneouslyRunningTaskNum, 0,
                     TimeUnit.DAYS, new LinkedBlockingQueue<>(taskQueueSize),
                     new BlockedPolicy("Analysis Job Executor Block Policy", 
Integer.MAX_VALUE),
-                    "Analysis Job Executor", true);
+                    poolName, true);
             cancelExpiredTask();
         } else {
             executors = null;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
index 5a231db24d3..f4fdc68f55c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
@@ -62,7 +62,7 @@ public class StatisticsAutoCollector extends MasterDaemon {
     public StatisticsAutoCollector() {
         super("Automatic Analyzer", 
TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes));
         this.analysisTaskExecutor = new 
AnalysisTaskExecutor(Config.auto_analyze_simultaneously_running_task_num,
-                StatisticConstants.TASK_QUEUE_CAP);
+                StatisticConstants.TASK_QUEUE_CAP, "Auto Analysis Job 
Executor");
     }
 
     @Override
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java
index 898c3af0fdf..703311c850d 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java
@@ -659,11 +659,11 @@ public class AnalysisManagerTest {
         AnalysisManager analysisManager = new AnalysisManager();
         for (int i = 0; i < 20; i++) {
             System.out.println("Submit " + i);
-            analysisManager.submitAsyncDropStatsTask(0, 0, 0, null, null);
+            analysisManager.submitAsyncDropStatsTask(null, 0, 0, 0, null, 
null, false);
         }
-        Thread.sleep(25000);
+        Thread.sleep(10000);
         System.out.println(count.get());
-        Assertions.assertTrue(count.get() > 10);
-        Assertions.assertTrue(count.get() < 20);
+        Assertions.assertTrue(count.get() > 0);
+        Assertions.assertTrue(count.get() <= 20);
     }
 }
diff --git a/regression-test/suites/statistics/analyze_stats.groovy 
b/regression-test/suites/statistics/analyze_stats.groovy
index 3916dd116ad..1078cbf218a 100644
--- a/regression-test/suites/statistics/analyze_stats.groovy
+++ b/regression-test/suites/statistics/analyze_stats.groovy
@@ -19,6 +19,22 @@ import java.util.stream.Collectors
 
 suite("test_analyze") {
 
+    def stats_dropped = { table ->
+        def result1 = sql """show column cached stats $table"""
+        def result2 = sql """show column stats $table"""
+        boolean dropped = false
+        for (int i = 0; i < 120; i++) {
+            if (0 == result1.size() && 0 == result2.size()) {
+                dropped = true;
+                break;
+            }
+            Thread.sleep(1000)
+            result1 = sql """show column cached stats $table"""
+            result2 = sql """show column stats $table"""
+        }
+        assertTrue(dropped)
+    }
+
     String db = "test_analyze"
 
     String tbl = "analyzetestlimited_duplicate_all"
@@ -1152,6 +1168,8 @@ PARTITION `p599` VALUES IN (599)
         ALTER TABLE analyze_test_with_schema_update ADD COLUMN tbl_name 
VARCHAR(256) DEFAULT NULL;
     """
 
+    stats_dropped("analyze_test_with_schema_update")
+
     sql """
         ANALYZE TABLE analyze_test_with_schema_update WITH SYNC
     """
@@ -1349,6 +1367,7 @@ PARTITION `p599` VALUES IN (599)
     def result_before_truncate = sql """show column stats ${tbl}"""
     assertEquals(14, result_before_truncate.size())
     sql """TRUNCATE TABLE ${tbl}"""
+    stats_dropped(tbl)
     def result_after_truncate = sql """show column stats ${tbl}"""
     assertEquals(0, result_after_truncate.size())
     result_after_truncate = sql """show column cached stats ${tbl}"""
@@ -1375,6 +1394,7 @@ PARTITION `p599` VALUES IN (599)
     assert 
"111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111
 [...]
 
     sql """TRUNCATE TABLE ${tbl}"""
+    stats_dropped(tbl)
     result_after_truncate = sql """show column stats ${tbl}"""
     assertEquals(0, result_after_truncate.size())
     sql """ANALYZE TABLE ${tbl} WITH SYNC"""
diff --git a/regression-test/suites/statistics/test_analyze_mv.groovy 
b/regression-test/suites/statistics/test_analyze_mv.groovy
index 3a8f7335375..daa8d780aee 100644
--- a/regression-test/suites/statistics/test_analyze_mv.groovy
+++ b/regression-test/suites/statistics/test_analyze_mv.groovy
@@ -108,6 +108,22 @@ suite("test_analyze_mv") {
         assertTrue(found)
     }
 
+    def stats_dropped = { table ->
+        def result1 = sql """show column cached stats $table"""
+        def result2 = sql """show column stats $table"""
+        boolean dropped = false
+        for (int i = 0; i < 120; i++) {
+            if (0 == result1.size() && 0 == result2.size()) {
+                dropped = true;
+                break;
+            }
+            Thread.sleep(1000)
+            result1 = sql """show column cached stats $table"""
+            result2 = sql """show column stats $table"""
+        }
+        assertTrue(dropped)
+    }
+
     sql """drop database if exists test_analyze_mv"""
     sql """create database test_analyze_mv"""
     sql """use test_analyze_mv"""
@@ -674,6 +690,7 @@ suite("test_analyze_mv") {
     // * Test row count report and report for nereids
     sql """truncate table mvTestDup"""
     result_row = sql """show index stats mvTestDup mv3"""
+    stats_dropped("mvTestDup")
     assertEquals(1, result_row.size())
     assertEquals("mvTestDup", result_row[0][0])
     assertEquals("mv3", result_row[0][1])
diff --git 
a/regression-test/suites/statistics/test_drop_stats_and_truncate.groovy 
b/regression-test/suites/statistics/test_drop_stats_and_truncate.groovy
index 6dc3c6d1797..31c1e423900 100644
--- a/regression-test/suites/statistics/test_drop_stats_and_truncate.groovy
+++ b/regression-test/suites/statistics/test_drop_stats_and_truncate.groovy
@@ -17,6 +17,22 @@
 
 suite("test_drop_stats_and_truncate") {
 
+    def stats_dropped = { table ->
+        def result1 = sql """show column cached stats $table"""
+        def result2 = sql """show column stats $table"""
+        boolean dropped = false
+        for (int i = 0; i < 120; i++) {
+            if (0 == result1.size() && 0 == result2.size()) {
+                dropped = true;
+                break;
+            }
+            Thread.sleep(1000)
+            result1 = sql """show column cached stats $table"""
+            result2 = sql """show column stats $table"""
+        }
+        assertTrue(dropped)
+    }
+
     sql """drop database if exists test_drop_stats_and_truncate"""
     sql """create database test_drop_stats_and_truncate"""
     sql """use test_drop_stats_and_truncate"""
@@ -101,6 +117,7 @@ suite("test_drop_stats_and_truncate") {
     assertEquals(3, columns.size())
 
     sql """truncate table non_part"""
+    stats_dropped("non_part")
     result = sql """show column stats non_part"""
     assertEquals(0, result.size())
     result = sql """show table stats non_part"""
@@ -148,6 +165,7 @@ suite("test_drop_stats_and_truncate") {
     assertEquals(9, columns.size())
 
     sql """truncate table part"""
+    stats_dropped("part")
     result = sql """show column stats part"""
     assertEquals(0, result.size())
     result = sql """show table stats part"""


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to