This is an automated email from the ASF dual-hosted git repository.

lijibing pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 4a07efe119c [improvement](statistics)Async drop table stats while 
doing truncate and schema change(#45923) (#46010)
4a07efe119c is described below

commit 4a07efe119c521817964570fce10344157551228
Author: James <lijib...@selectdb.com>
AuthorDate: Thu Dec 26 21:57:18 2024 +0800

    [improvement](statistics)Async drop table stats while doing truncate and 
schema change(#45923) (#46010)
    
    backport: https://github.com/apache/doris/pull/45923
---
 .../apache/doris/statistics/AnalysisManager.java   | 57 ++++++++++++++++++----
 .../doris/statistics/AnalysisTaskExecutor.java     |  6 +--
 .../doris/statistics/StatisticsAutoCollector.java  |  2 +-
 .../suites/statistics/analyze_stats.groovy         | 20 ++++++++
 .../suites/statistics/test_analyze_mv.groovy       | 18 +++++++
 .../statistics/test_drop_stats_and_truncate.groovy | 18 +++++++
 6 files changed, 108 insertions(+), 13 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index 0e4a1c7b42d..c43136a08f6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -115,6 +115,7 @@ public class AnalysisManager implements Writable {
     private StatisticsCache statisticsCache;
 
     private AnalysisTaskExecutor taskExecutor;
+    private ThreadPoolExecutor dropStatsExecutors;
 
     // Store task information in metadata.
     protected final NavigableMap<Long, AnalysisInfo> analysisTaskInfoMap =
@@ -136,8 +137,13 @@ public class AnalysisManager implements Writable {
     public AnalysisManager() {
         if (!Env.isCheckpointThread()) {
             this.taskExecutor = new 
AnalysisTaskExecutor(Config.statistics_simultaneously_running_task_num,
-                    Integer.MAX_VALUE);
+                    Integer.MAX_VALUE, "Manual Analysis Job Executor");
             this.statisticsCache = new StatisticsCache();
+            this.dropStatsExecutors = ThreadPoolManager.newDaemonThreadPool(
+                    1, 3, 10,
+                    TimeUnit.DAYS, new LinkedBlockingQueue<>(20),
+                    new ThreadPoolExecutor.DiscardPolicy(),
+                    "Drop stats executor", true);
         }
     }
 
@@ -656,19 +662,52 @@ public class AnalysisManager implements Writable {
             long catalogId = table.getDatabase().getCatalog().getId();
             long dbId = table.getDatabase().getId();
             long tableId = table.getId();
-            removeTableStats(tableId);
-            Env.getCurrentEnv().getEditLog().logDeleteTableStats(new 
TableStatsDeletionLog(tableId));
-            Set<String> cols = 
table.getSchemaAllIndexes(false).stream().map(Column::getName)
-                    .collect(Collectors.toSet());
-            invalidateLocalStats(catalogId, dbId, tableId, null, tableStats);
-            // Drop stats ddl is master only operation.
-            invalidateRemoteStats(catalogId, dbId, tableId, cols, true);
-            StatisticsRepository.dropStatisticsByColNames(catalogId, dbId, 
table.getId(), cols);
+            asyncDropStatsTask(table, catalogId, dbId, tableId, tableStats);
         } catch (Throwable e) {
             LOG.warn("Failed to drop stats for table {}", table.getName(), e);
         }
     }
 
+    class DropStatsTask implements Runnable {
+        private final long catalogId;
+        private final long dbId;
+        private final long tableId;
+        private final TableStatsMeta tableStats;
+        private final TableIf table;
+
+        public DropStatsTask(TableIf table, long catalogId, long dbId, long 
tableId, TableStatsMeta tableStats) {
+            this.catalogId = catalogId;
+            this.dbId = dbId;
+            this.tableId = tableId;
+            this.tableStats = tableStats;
+            this.table = table;
+        }
+
+        @Override
+        public void run() {
+            try {
+                removeTableStats(tableId);
+                Env.getCurrentEnv().getEditLog().logDeleteTableStats(new 
TableStatsDeletionLog(tableId));
+                Set<String> cols = 
table.getSchemaAllIndexes(false).stream().map(Column::getName)
+                        .collect(Collectors.toSet());
+                StatisticsRepository.dropStatisticsByColNames(catalogId, dbId, 
table.getId(), cols);
+                invalidateLocalStats(catalogId, dbId, tableId, null, 
tableStats);
+                // Drop stats ddl is master only operation.
+                invalidateRemoteStats(catalogId, dbId, tableId, cols, true);
+            } catch (Throwable e) {
+                LOG.warn("Failed to drop stats for table {}", table.getName(), 
e);
+            }
+        }
+    }
+
+    public void asyncDropStatsTask(TableIf table, long catalogId, long dbId, 
long tableId, TableStatsMeta tableStats) {
+        try {
+            dropStatsExecutors.submit(new DropStatsTask(table, catalogId, 
dbId, tableId, tableStats));
+        } catch (Throwable t) {
+            LOG.info("Failed to submit async drop stats job. reason: {}", 
t.getMessage());
+        }
+    }
+
     public void dropCachedStats(long catalogId, long dbId, long tableId) {
         TableIf table = StatisticsUtil.findTable(catalogId, dbId, tableId);
         StatisticsCache statsCache = Env.getCurrentEnv().getStatisticsCache();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
index 3db9a862d10..23cfc0b6d11 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
@@ -44,17 +44,17 @@ public class AnalysisTaskExecutor {
                     
Comparator.comparingLong(AnalysisTaskWrapper::getStartTime));
 
     public AnalysisTaskExecutor(int simultaneouslyRunningTaskNum) {
-        this(simultaneouslyRunningTaskNum, Integer.MAX_VALUE);
+        this(simultaneouslyRunningTaskNum, Integer.MAX_VALUE, "Analysis Job 
Executor");
     }
 
-    public AnalysisTaskExecutor(int simultaneouslyRunningTaskNum, int 
taskQueueSize) {
+    public AnalysisTaskExecutor(int simultaneouslyRunningTaskNum, int 
taskQueueSize, String poolName) {
         if (!Env.isCheckpointThread()) {
             executors = ThreadPoolManager.newDaemonThreadPool(
                     simultaneouslyRunningTaskNum,
                     simultaneouslyRunningTaskNum, 0,
                     TimeUnit.DAYS, new LinkedBlockingQueue<>(taskQueueSize),
                     new BlockedPolicy("Analysis Job Executor", 
Integer.MAX_VALUE),
-                    "Analysis Job Executor", true);
+                    poolName, true);
             cancelExpiredTask();
         } else {
             executors = null;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
index 574b25da422..cc721720230 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
@@ -58,7 +58,7 @@ public class StatisticsAutoCollector extends MasterDaemon {
     public StatisticsAutoCollector() {
         super("Automatic Analyzer", TimeUnit.SECONDS.toMillis(10));
         this.analysisTaskExecutor = new 
AnalysisTaskExecutor(Config.auto_analyze_simultaneously_running_task_num,
-                StatisticConstants.TASK_QUEUE_CAP);
+                StatisticConstants.TASK_QUEUE_CAP, "Auto Analysis Job 
Executor");
     }
 
     @Override
diff --git a/regression-test/suites/statistics/analyze_stats.groovy 
b/regression-test/suites/statistics/analyze_stats.groovy
index 7f4b9abee47..6d845a11da4 100644
--- a/regression-test/suites/statistics/analyze_stats.groovy
+++ b/regression-test/suites/statistics/analyze_stats.groovy
@@ -19,6 +19,22 @@ import java.util.stream.Collectors
 
 suite("test_analyze") {
 
+    def stats_dropped = { table ->
+        def result1 = sql """show column cached stats $table"""
+        def result2 = sql """show column stats $table"""
+        boolean dropped = false
+        for (int i = 0; i < 120; i++) {
+            if (0 == result1.size() && 0 == result2.size()) {
+                dropped = true;
+                break;
+            }
+            Thread.sleep(1000)
+            result1 = sql """show column cached stats $table"""
+            result2 = sql """show column stats $table"""
+        }
+        assertTrue(dropped)
+    }
+
     String db = "test_analyze"
 
     String tbl = "analyzetestlimited_duplicate_all"
@@ -1159,6 +1175,8 @@ PARTITION `p599` VALUES IN (599)
         ALTER TABLE analyze_test_with_schema_update ADD COLUMN tbl_name 
VARCHAR(256) DEFAULT NULL;
     """
 
+    stats_dropped("analyze_test_with_schema_update")
+
     sql """
         ANALYZE TABLE analyze_test_with_schema_update WITH SYNC
     """
@@ -1356,6 +1374,7 @@ PARTITION `p599` VALUES IN (599)
     def result_before_truncate = sql """show column stats ${tbl}"""
     assertEquals(14, result_before_truncate.size())
     sql """TRUNCATE TABLE ${tbl}"""
+    stats_dropped(tbl)
     def result_after_truncate = sql """show column stats ${tbl}"""
     assertEquals(0, result_after_truncate.size())
     result_after_truncate = sql """show column cached stats ${tbl}"""
@@ -1382,6 +1401,7 @@ PARTITION `p599` VALUES IN (599)
     assert 
"111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111
 [...]
 
     sql """TRUNCATE TABLE ${tbl}"""
+    stats_dropped(tbl)
     result_after_truncate = sql """show column stats ${tbl}"""
     assertEquals(0, result_after_truncate.size())
     sql """ANALYZE TABLE ${tbl} WITH SYNC"""
diff --git a/regression-test/suites/statistics/test_analyze_mv.groovy 
b/regression-test/suites/statistics/test_analyze_mv.groovy
index 06a5de85bef..7e978a0c2b9 100644
--- a/regression-test/suites/statistics/test_analyze_mv.groovy
+++ b/regression-test/suites/statistics/test_analyze_mv.groovy
@@ -108,6 +108,22 @@ suite("test_analyze_mv") {
         assertTrue(found)
     }
 
+    def stats_dropped = { table ->
+        def result1 = sql """show column cached stats $table"""
+        def result2 = sql """show column stats $table"""
+        boolean dropped = false
+        for (int i = 0; i < 120; i++) {
+            if (0 == result1.size() && 0 == result2.size()) {
+                dropped = true;
+                break;
+            }
+            Thread.sleep(1000)
+            result1 = sql """show column cached stats $table"""
+            result2 = sql """show column stats $table"""
+        }
+        assertTrue(dropped)
+    }
+
     sql """drop database if exists test_analyze_mv"""
     sql """create database test_analyze_mv"""
     sql """use test_analyze_mv"""
@@ -679,6 +695,7 @@ suite("test_analyze_mv") {
         // Test row count report and report for nereids
         sql """truncate table mvTestDup"""
         result_row = sql """show index stats mvTestDup mv3"""
+        stats_dropped("mvTestDup")
         assertEquals(1, result_row.size())
         assertEquals("mvTestDup", result_row[0][0])
         assertEquals("mv3", result_row[0][1])
@@ -714,6 +731,7 @@ suite("test_analyze_mv") {
 
         // ** Embedded test for skip auto analyze when table is empty again
         sql """analyze table mvTestDup properties ("use.auto.analyzer" = 
"true")"""
+        stats_dropped("mvTestDup")
         empty_test = sql """show auto analyze mvTestDup"""
         assertEquals(0, empty_test.size())
         empty_test = sql """show column stats mvTestDup"""
diff --git 
a/regression-test/suites/statistics/test_drop_stats_and_truncate.groovy 
b/regression-test/suites/statistics/test_drop_stats_and_truncate.groovy
index 969e03cb295..a6a9f4471a4 100644
--- a/regression-test/suites/statistics/test_drop_stats_and_truncate.groovy
+++ b/regression-test/suites/statistics/test_drop_stats_and_truncate.groovy
@@ -17,6 +17,22 @@
 
 suite("test_drop_stats_and_truncate") {
 
+    def stats_dropped = { table ->
+        def result1 = sql """show column cached stats $table"""
+        def result2 = sql """show column stats $table"""
+        boolean dropped = false
+        for (int i = 0; i < 120; i++) {
+            if (0 == result1.size() && 0 == result2.size()) {
+                dropped = true;
+                break;
+            }
+            Thread.sleep(1000)
+            result1 = sql """show column cached stats $table"""
+            result2 = sql """show column stats $table"""
+        }
+        assertTrue(dropped)
+    }
+
     sql """drop database if exists test_drop_stats_and_truncate"""
     sql """create database test_drop_stats_and_truncate"""
     sql """use test_drop_stats_and_truncate"""
@@ -100,6 +116,7 @@ suite("test_drop_stats_and_truncate") {
     assertEquals(3, columns.size())
 
     sql """truncate table non_part"""
+    stats_dropped("non_part")
     result = sql """show column stats non_part"""
     assertEquals(0, result.size())
     result = sql """show table stats non_part"""
@@ -147,6 +164,7 @@ suite("test_drop_stats_and_truncate") {
     assertEquals(9, columns.size())
 
     sql """truncate table part"""
+    stats_dropped("part")
     result = sql """show column stats part"""
     assertEquals(0, result.size())
     result = sql """show table stats part"""


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to