This is an automated email from the ASF dual-hosted git repository. lijibing pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new ee733c0ec44 branch-3.0: [improvement](statistics)Async drop table stats while doing truncate and schema change. #45923 (#45997) ee733c0ec44 is described below commit ee733c0ec44035931a278fbc5570d05efa1e4f52 Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Thu Dec 26 16:10:15 2024 +0800 branch-3.0: [improvement](statistics)Async drop table stats while doing truncate and schema change. #45923 (#45997) Cherry-picked from #45923 Co-authored-by: James <lijib...@selectdb.com> --- .../apache/doris/service/FrontendServiceImpl.java | 6 +- .../apache/doris/statistics/AnalysisManager.java | 64 +++++++++++++--------- .../doris/statistics/AnalysisTaskExecutor.java | 6 +- .../doris/statistics/StatisticsAutoCollector.java | 2 +- .../doris/statistics/AnalysisManagerTest.java | 8 +-- .../suites/statistics/analyze_stats.groovy | 20 +++++++ .../suites/statistics/test_analyze_mv.groovy | 17 ++++++ .../statistics/test_drop_stats_and_truncate.groovy | 18 ++++++ 8 files changed, 105 insertions(+), 36 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 8323540f5d1..f4f10bf331d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -114,6 +114,7 @@ import org.apache.doris.statistics.StatisticsCacheKey; import org.apache.doris.statistics.TableStatsMeta; import org.apache.doris.statistics.UpdatePartitionStatsTarget; import org.apache.doris.statistics.query.QueryStats; +import org.apache.doris.statistics.util.StatisticsUtil; import org.apache.doris.system.Backend; import org.apache.doris.system.Frontend; import org.apache.doris.system.SystemInfoService; @@ -3399,8 +3400,9 @@ public class FrontendServiceImpl implements FrontendService.Iface { partitionNames = new PartitionNames(false, new ArrayList<>(target.partitions)); } if (target.isTruncate) { - analysisManager.submitAsyncDropStatsTask(target.catalogId, target.dbId, - target.tableId, tableStats, partitionNames); + TableIf table = StatisticsUtil.findTable(target.catalogId, target.dbId, target.tableId); + analysisManager.submitAsyncDropStatsTask(table, target.catalogId, target.dbId, + target.tableId, tableStats, partitionNames, false); } else { analysisManager.invalidateLocalStats(target.catalogId, target.dbId, target.tableId, target.columns, tableStats, partitionNames); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index ece9daf2520..d92179cf2f0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -159,12 +159,12 @@ public class AnalysisManager implements Writable { public AnalysisManager() { if (!Env.isCheckpointThread()) { this.taskExecutor = new AnalysisTaskExecutor(Config.statistics_simultaneously_running_task_num, - Integer.MAX_VALUE); + Integer.MAX_VALUE, "Manual Analysis Job Executor"); this.statisticsCache = new StatisticsCache(); this.dropStatsExecutors = ThreadPoolManager.newDaemonThreadPool( - 1, 1, 0, - TimeUnit.DAYS, new LinkedBlockingQueue<>(10), - new ThreadPoolExecutor.AbortPolicy(), + 1, 3, 10, + TimeUnit.DAYS, new LinkedBlockingQueue<>(20), + new ThreadPoolExecutor.DiscardPolicy(), "Drop stats executor", true); } } @@ -696,20 +696,7 @@ public class AnalysisManager implements Writable { long catalogId = table.getDatabase().getCatalog().getId(); long dbId = table.getDatabase().getId(); long tableId = table.getId(); - if (!table.isPartitionedTable() || partitionNames == null - || partitionNames.isStar() || partitionNames.getPartitionNames() == null) { - removeTableStats(tableId); - Env.getCurrentEnv().getEditLog().logDeleteTableStats(new TableStatsDeletionLog(tableId)); - } - submitAsyncDropStatsTask(catalogId, dbId, tableId, tableStats, partitionNames); - // Drop stats ddl is master only operation. - Set<String> partitions = null; - if (partitionNames != null && !partitionNames.isStar() && partitionNames.getPartitionNames() != null) { - partitions = new HashSet<>(partitionNames.getPartitionNames()); - } - // Drop stats ddl is master only operation. - invalidateRemoteStats(catalogId, dbId, tableId, null, partitions, true); - StatisticsRepository.dropStatistics(catalogId, dbId, table.getId(), null, partitions); + submitAsyncDropStatsTask(table, catalogId, dbId, tableId, tableStats, partitionNames, true); } catch (Throwable e) { LOG.warn("Failed to drop stats for table {}", table.getName(), e); } @@ -722,30 +709,55 @@ public class AnalysisManager implements Writable { private final Set<String> columns; private final TableStatsMeta tableStats; private final PartitionNames partitionNames; + private final TableIf table; + private final boolean isMaster; - public DropStatsTask(long catalogId, long dbId, long tableId, Set<String> columns, - TableStatsMeta tableStats, PartitionNames partitionNames) { + public DropStatsTask(TableIf table, long catalogId, long dbId, long tableId, Set<String> columns, + TableStatsMeta tableStats, PartitionNames partitionNames, boolean isMaster) { this.catalogId = catalogId; this.dbId = dbId; this.tableId = tableId; this.columns = columns; this.tableStats = tableStats; this.partitionNames = partitionNames; + this.table = table; + this.isMaster = isMaster; } @Override public void run() { - invalidateLocalStats(catalogId, dbId, tableId, columns, tableStats, partitionNames); + try { + if (isMaster) { + if (!table.isPartitionedTable() || partitionNames == null + || partitionNames.isStar() || partitionNames.getPartitionNames() == null) { + removeTableStats(tableId); + Env.getCurrentEnv().getEditLog().logDeleteTableStats(new TableStatsDeletionLog(tableId)); + } + // Drop stats ddl is master only operation. + Set<String> partitions = null; + if (partitionNames != null && !partitionNames.isStar() + && partitionNames.getPartitionNames() != null) { + partitions = new HashSet<>(partitionNames.getPartitionNames()); + } + // Drop stats ddl is master only operation. + StatisticsRepository.dropStatistics(catalogId, dbId, tableId, null, partitions); + invalidateRemoteStats(catalogId, dbId, tableId, null, partitions, true); + } + invalidateLocalStats(catalogId, dbId, tableId, columns, tableStats, partitionNames); + } catch (Throwable t) { + LOG.info("Failed to async drop stats for table {}.{}.{}, reason: {}", + catalogId, dbId, tableId, t.getMessage()); + } } } - public void submitAsyncDropStatsTask(long catalogId, long dbId, long tableId, - TableStatsMeta tableStats, PartitionNames partitionNames) { + public void submitAsyncDropStatsTask(TableIf table, long catalogId, long dbId, long tableId, + TableStatsMeta tableStats, PartitionNames partitionNames, boolean isMaster) { try { - dropStatsExecutors.submit(new DropStatsTask(catalogId, dbId, tableId, null, tableStats, partitionNames)); + dropStatsExecutors.submit(new DropStatsTask(table, catalogId, dbId, tableId, null, + tableStats, partitionNames, isMaster)); } catch (Throwable t) { - LOG.info("Failed to drop stats for truncate table {}.{}.{}. Reason:{}", - catalogId, dbId, tableId, t.getMessage()); + LOG.info("Failed to submit async drop stats job. reason: {}", t.getMessage()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java index 5b87608ba51..bc1126e9c51 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java @@ -44,17 +44,17 @@ public class AnalysisTaskExecutor { Comparator.comparingLong(AnalysisTaskWrapper::getStartTime)); public AnalysisTaskExecutor(int simultaneouslyRunningTaskNum) { - this(simultaneouslyRunningTaskNum, Integer.MAX_VALUE); + this(simultaneouslyRunningTaskNum, Integer.MAX_VALUE, "Analysis Job Executor"); } - public AnalysisTaskExecutor(int simultaneouslyRunningTaskNum, int taskQueueSize) { + public AnalysisTaskExecutor(int simultaneouslyRunningTaskNum, int taskQueueSize, String poolName) { if (!Env.isCheckpointThread()) { executors = ThreadPoolManager.newDaemonThreadPool( simultaneouslyRunningTaskNum, simultaneouslyRunningTaskNum, 0, TimeUnit.DAYS, new LinkedBlockingQueue<>(taskQueueSize), new BlockedPolicy("Analysis Job Executor Block Policy", Integer.MAX_VALUE), - "Analysis Job Executor", true); + poolName, true); cancelExpiredTask(); } else { executors = null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java index 5a231db24d3..f4fdc68f55c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java @@ -62,7 +62,7 @@ public class StatisticsAutoCollector extends MasterDaemon { public StatisticsAutoCollector() { super("Automatic Analyzer", TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes)); this.analysisTaskExecutor = new AnalysisTaskExecutor(Config.auto_analyze_simultaneously_running_task_num, - StatisticConstants.TASK_QUEUE_CAP); + StatisticConstants.TASK_QUEUE_CAP, "Auto Analysis Job Executor"); } @Override diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java index 898c3af0fdf..703311c850d 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java @@ -659,11 +659,11 @@ public class AnalysisManagerTest { AnalysisManager analysisManager = new AnalysisManager(); for (int i = 0; i < 20; i++) { System.out.println("Submit " + i); - analysisManager.submitAsyncDropStatsTask(0, 0, 0, null, null); + analysisManager.submitAsyncDropStatsTask(null, 0, 0, 0, null, null, false); } - Thread.sleep(25000); + Thread.sleep(10000); System.out.println(count.get()); - Assertions.assertTrue(count.get() > 10); - Assertions.assertTrue(count.get() < 20); + Assertions.assertTrue(count.get() > 0); + Assertions.assertTrue(count.get() <= 20); } } diff --git a/regression-test/suites/statistics/analyze_stats.groovy b/regression-test/suites/statistics/analyze_stats.groovy index 3916dd116ad..1078cbf218a 100644 --- a/regression-test/suites/statistics/analyze_stats.groovy +++ b/regression-test/suites/statistics/analyze_stats.groovy @@ -19,6 +19,22 @@ import java.util.stream.Collectors suite("test_analyze") { + def stats_dropped = { table -> + def result1 = sql """show column cached stats $table""" + def result2 = sql """show column stats $table""" + boolean dropped = false + for (int i = 0; i < 120; i++) { + if (0 == result1.size() && 0 == result2.size()) { + dropped = true; + break; + } + Thread.sleep(1000) + result1 = sql """show column cached stats $table""" + result2 = sql """show column stats $table""" + } + assertTrue(dropped) + } + String db = "test_analyze" String tbl = "analyzetestlimited_duplicate_all" @@ -1152,6 +1168,8 @@ PARTITION `p599` VALUES IN (599) ALTER TABLE analyze_test_with_schema_update ADD COLUMN tbl_name VARCHAR(256) DEFAULT NULL; """ + stats_dropped("analyze_test_with_schema_update") + sql """ ANALYZE TABLE analyze_test_with_schema_update WITH SYNC """ @@ -1349,6 +1367,7 @@ PARTITION `p599` VALUES IN (599) def result_before_truncate = sql """show column stats ${tbl}""" assertEquals(14, result_before_truncate.size()) sql """TRUNCATE TABLE ${tbl}""" + stats_dropped(tbl) def result_after_truncate = sql """show column stats ${tbl}""" assertEquals(0, result_after_truncate.size()) result_after_truncate = sql """show column cached stats ${tbl}""" @@ -1375,6 +1394,7 @@ PARTITION `p599` VALUES IN (599) assert "111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111 [...] sql """TRUNCATE TABLE ${tbl}""" + stats_dropped(tbl) result_after_truncate = sql """show column stats ${tbl}""" assertEquals(0, result_after_truncate.size()) sql """ANALYZE TABLE ${tbl} WITH SYNC""" diff --git a/regression-test/suites/statistics/test_analyze_mv.groovy b/regression-test/suites/statistics/test_analyze_mv.groovy index 3a8f7335375..daa8d780aee 100644 --- a/regression-test/suites/statistics/test_analyze_mv.groovy +++ b/regression-test/suites/statistics/test_analyze_mv.groovy @@ -108,6 +108,22 @@ suite("test_analyze_mv") { assertTrue(found) } + def stats_dropped = { table -> + def result1 = sql """show column cached stats $table""" + def result2 = sql """show column stats $table""" + boolean dropped = false + for (int i = 0; i < 120; i++) { + if (0 == result1.size() && 0 == result2.size()) { + dropped = true; + break; + } + Thread.sleep(1000) + result1 = sql """show column cached stats $table""" + result2 = sql """show column stats $table""" + } + assertTrue(dropped) + } + sql """drop database if exists test_analyze_mv""" sql """create database test_analyze_mv""" sql """use test_analyze_mv""" @@ -674,6 +690,7 @@ suite("test_analyze_mv") { // * Test row count report and report for nereids sql """truncate table mvTestDup""" result_row = sql """show index stats mvTestDup mv3""" + stats_dropped("mvTestDup") assertEquals(1, result_row.size()) assertEquals("mvTestDup", result_row[0][0]) assertEquals("mv3", result_row[0][1]) diff --git a/regression-test/suites/statistics/test_drop_stats_and_truncate.groovy b/regression-test/suites/statistics/test_drop_stats_and_truncate.groovy index 6dc3c6d1797..31c1e423900 100644 --- a/regression-test/suites/statistics/test_drop_stats_and_truncate.groovy +++ b/regression-test/suites/statistics/test_drop_stats_and_truncate.groovy @@ -17,6 +17,22 @@ suite("test_drop_stats_and_truncate") { + def stats_dropped = { table -> + def result1 = sql """show column cached stats $table""" + def result2 = sql """show column stats $table""" + boolean dropped = false + for (int i = 0; i < 120; i++) { + if (0 == result1.size() && 0 == result2.size()) { + dropped = true; + break; + } + Thread.sleep(1000) + result1 = sql """show column cached stats $table""" + result2 = sql """show column stats $table""" + } + assertTrue(dropped) + } + sql """drop database if exists test_drop_stats_and_truncate""" sql """create database test_drop_stats_and_truncate""" sql """use test_drop_stats_and_truncate""" @@ -101,6 +117,7 @@ suite("test_drop_stats_and_truncate") { assertEquals(3, columns.size()) sql """truncate table non_part""" + stats_dropped("non_part") result = sql """show column stats non_part""" assertEquals(0, result.size()) result = sql """show table stats non_part""" @@ -148,6 +165,7 @@ suite("test_drop_stats_and_truncate") { assertEquals(9, columns.size()) sql """truncate table part""" + stats_dropped("part") result = sql """show column stats part""" assertEquals(0, result.size()) result = sql """show table stats part""" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org