This is an automated email from the ASF dual-hosted git repository.
lijibing pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new ee733c0ec44 branch-3.0: [improvement](statistics)Async drop table
stats while doing truncate and schema change. #45923 (#45997)
ee733c0ec44 is described below
commit ee733c0ec44035931a278fbc5570d05efa1e4f52
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Dec 26 16:10:15 2024 +0800
branch-3.0: [improvement](statistics)Async drop table stats while doing
truncate and schema change. #45923 (#45997)
Cherry-picked from #45923
Co-authored-by: James <[email protected]>
---
.../apache/doris/service/FrontendServiceImpl.java | 6 +-
.../apache/doris/statistics/AnalysisManager.java | 64 +++++++++++++---------
.../doris/statistics/AnalysisTaskExecutor.java | 6 +-
.../doris/statistics/StatisticsAutoCollector.java | 2 +-
.../doris/statistics/AnalysisManagerTest.java | 8 +--
.../suites/statistics/analyze_stats.groovy | 20 +++++++
.../suites/statistics/test_analyze_mv.groovy | 17 ++++++
.../statistics/test_drop_stats_and_truncate.groovy | 18 ++++++
8 files changed, 105 insertions(+), 36 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 8323540f5d1..f4f10bf331d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -114,6 +114,7 @@ import org.apache.doris.statistics.StatisticsCacheKey;
import org.apache.doris.statistics.TableStatsMeta;
import org.apache.doris.statistics.UpdatePartitionStatsTarget;
import org.apache.doris.statistics.query.QueryStats;
+import org.apache.doris.statistics.util.StatisticsUtil;
import org.apache.doris.system.Backend;
import org.apache.doris.system.Frontend;
import org.apache.doris.system.SystemInfoService;
@@ -3399,8 +3400,9 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
partitionNames = new PartitionNames(false, new
ArrayList<>(target.partitions));
}
if (target.isTruncate) {
- analysisManager.submitAsyncDropStatsTask(target.catalogId,
target.dbId,
- target.tableId, tableStats, partitionNames);
+ TableIf table = StatisticsUtil.findTable(target.catalogId,
target.dbId, target.tableId);
+ analysisManager.submitAsyncDropStatsTask(table, target.catalogId,
target.dbId,
+ target.tableId, tableStats, partitionNames, false);
} else {
analysisManager.invalidateLocalStats(target.catalogId,
target.dbId, target.tableId,
target.columns, tableStats, partitionNames);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index ece9daf2520..d92179cf2f0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -159,12 +159,12 @@ public class AnalysisManager implements Writable {
public AnalysisManager() {
if (!Env.isCheckpointThread()) {
this.taskExecutor = new
AnalysisTaskExecutor(Config.statistics_simultaneously_running_task_num,
- Integer.MAX_VALUE);
+ Integer.MAX_VALUE, "Manual Analysis Job Executor");
this.statisticsCache = new StatisticsCache();
this.dropStatsExecutors = ThreadPoolManager.newDaemonThreadPool(
- 1, 1, 0,
- TimeUnit.DAYS, new LinkedBlockingQueue<>(10),
- new ThreadPoolExecutor.AbortPolicy(),
+ 1, 3, 10,
+ TimeUnit.DAYS, new LinkedBlockingQueue<>(20),
+ new ThreadPoolExecutor.DiscardPolicy(),
"Drop stats executor", true);
}
}
@@ -696,20 +696,7 @@ public class AnalysisManager implements Writable {
long catalogId = table.getDatabase().getCatalog().getId();
long dbId = table.getDatabase().getId();
long tableId = table.getId();
- if (!table.isPartitionedTable() || partitionNames == null
- || partitionNames.isStar() ||
partitionNames.getPartitionNames() == null) {
- removeTableStats(tableId);
- Env.getCurrentEnv().getEditLog().logDeleteTableStats(new
TableStatsDeletionLog(tableId));
- }
- submitAsyncDropStatsTask(catalogId, dbId, tableId, tableStats,
partitionNames);
- // Drop stats ddl is master only operation.
- Set<String> partitions = null;
- if (partitionNames != null && !partitionNames.isStar() &&
partitionNames.getPartitionNames() != null) {
- partitions = new HashSet<>(partitionNames.getPartitionNames());
- }
- // Drop stats ddl is master only operation.
- invalidateRemoteStats(catalogId, dbId, tableId, null, partitions,
true);
- StatisticsRepository.dropStatistics(catalogId, dbId,
table.getId(), null, partitions);
+ submitAsyncDropStatsTask(table, catalogId, dbId, tableId,
tableStats, partitionNames, true);
} catch (Throwable e) {
LOG.warn("Failed to drop stats for table {}", table.getName(), e);
}
@@ -722,30 +709,55 @@ public class AnalysisManager implements Writable {
private final Set<String> columns;
private final TableStatsMeta tableStats;
private final PartitionNames partitionNames;
+ private final TableIf table;
+ private final boolean isMaster;
- public DropStatsTask(long catalogId, long dbId, long tableId,
Set<String> columns,
- TableStatsMeta tableStats, PartitionNames
partitionNames) {
+ public DropStatsTask(TableIf table, long catalogId, long dbId, long
tableId, Set<String> columns,
+ TableStatsMeta tableStats, PartitionNames
partitionNames, boolean isMaster) {
this.catalogId = catalogId;
this.dbId = dbId;
this.tableId = tableId;
this.columns = columns;
this.tableStats = tableStats;
this.partitionNames = partitionNames;
+ this.table = table;
+ this.isMaster = isMaster;
}
@Override
public void run() {
- invalidateLocalStats(catalogId, dbId, tableId, columns,
tableStats, partitionNames);
+ try {
+ if (isMaster) {
+ if (!table.isPartitionedTable() || partitionNames == null
+ || partitionNames.isStar() ||
partitionNames.getPartitionNames() == null) {
+ removeTableStats(tableId);
+
Env.getCurrentEnv().getEditLog().logDeleteTableStats(new
TableStatsDeletionLog(tableId));
+ }
+ // Drop stats ddl is master only operation.
+ Set<String> partitions = null;
+ if (partitionNames != null && !partitionNames.isStar()
+ && partitionNames.getPartitionNames() != null) {
+ partitions = new
HashSet<>(partitionNames.getPartitionNames());
+ }
+ // Drop stats ddl is master only operation.
+ StatisticsRepository.dropStatistics(catalogId, dbId,
tableId, null, partitions);
+ invalidateRemoteStats(catalogId, dbId, tableId, null,
partitions, true);
+ }
+ invalidateLocalStats(catalogId, dbId, tableId, columns,
tableStats, partitionNames);
+ } catch (Throwable t) {
+ LOG.info("Failed to async drop stats for table {}.{}.{},
reason: {}",
+ catalogId, dbId, tableId, t.getMessage());
+ }
}
}
- public void submitAsyncDropStatsTask(long catalogId, long dbId, long
tableId,
- TableStatsMeta tableStats,
PartitionNames partitionNames) {
+ public void submitAsyncDropStatsTask(TableIf table, long catalogId, long
dbId, long tableId,
+ TableStatsMeta tableStats,
PartitionNames partitionNames, boolean isMaster) {
try {
- dropStatsExecutors.submit(new DropStatsTask(catalogId, dbId,
tableId, null, tableStats, partitionNames));
+ dropStatsExecutors.submit(new DropStatsTask(table, catalogId,
dbId, tableId, null,
+ tableStats, partitionNames, isMaster));
} catch (Throwable t) {
- LOG.info("Failed to drop stats for truncate table {}.{}.{}.
Reason:{}",
- catalogId, dbId, tableId, t.getMessage());
+ LOG.info("Failed to submit async drop stats job. reason: {}",
t.getMessage());
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
index 5b87608ba51..bc1126e9c51 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
@@ -44,17 +44,17 @@ public class AnalysisTaskExecutor {
Comparator.comparingLong(AnalysisTaskWrapper::getStartTime));
public AnalysisTaskExecutor(int simultaneouslyRunningTaskNum) {
- this(simultaneouslyRunningTaskNum, Integer.MAX_VALUE);
+ this(simultaneouslyRunningTaskNum, Integer.MAX_VALUE, "Analysis Job
Executor");
}
- public AnalysisTaskExecutor(int simultaneouslyRunningTaskNum, int
taskQueueSize) {
+ public AnalysisTaskExecutor(int simultaneouslyRunningTaskNum, int
taskQueueSize, String poolName) {
if (!Env.isCheckpointThread()) {
executors = ThreadPoolManager.newDaemonThreadPool(
simultaneouslyRunningTaskNum,
simultaneouslyRunningTaskNum, 0,
TimeUnit.DAYS, new LinkedBlockingQueue<>(taskQueueSize),
new BlockedPolicy("Analysis Job Executor Block Policy",
Integer.MAX_VALUE),
- "Analysis Job Executor", true);
+ poolName, true);
cancelExpiredTask();
} else {
executors = null;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
index 5a231db24d3..f4fdc68f55c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
@@ -62,7 +62,7 @@ public class StatisticsAutoCollector extends MasterDaemon {
public StatisticsAutoCollector() {
super("Automatic Analyzer",
TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes));
this.analysisTaskExecutor = new
AnalysisTaskExecutor(Config.auto_analyze_simultaneously_running_task_num,
- StatisticConstants.TASK_QUEUE_CAP);
+ StatisticConstants.TASK_QUEUE_CAP, "Auto Analysis Job
Executor");
}
@Override
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java
index 898c3af0fdf..703311c850d 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java
@@ -659,11 +659,11 @@ public class AnalysisManagerTest {
AnalysisManager analysisManager = new AnalysisManager();
for (int i = 0; i < 20; i++) {
System.out.println("Submit " + i);
- analysisManager.submitAsyncDropStatsTask(0, 0, 0, null, null);
+ analysisManager.submitAsyncDropStatsTask(null, 0, 0, 0, null,
null, false);
}
- Thread.sleep(25000);
+ Thread.sleep(10000);
System.out.println(count.get());
- Assertions.assertTrue(count.get() > 10);
- Assertions.assertTrue(count.get() < 20);
+ Assertions.assertTrue(count.get() > 0);
+ Assertions.assertTrue(count.get() <= 20);
}
}
diff --git a/regression-test/suites/statistics/analyze_stats.groovy
b/regression-test/suites/statistics/analyze_stats.groovy
index 3916dd116ad..1078cbf218a 100644
--- a/regression-test/suites/statistics/analyze_stats.groovy
+++ b/regression-test/suites/statistics/analyze_stats.groovy
@@ -19,6 +19,22 @@ import java.util.stream.Collectors
suite("test_analyze") {
+ def stats_dropped = { table ->
+ def result1 = sql """show column cached stats $table"""
+ def result2 = sql """show column stats $table"""
+ boolean dropped = false
+ for (int i = 0; i < 120; i++) {
+ if (0 == result1.size() && 0 == result2.size()) {
+ dropped = true;
+ break;
+ }
+ Thread.sleep(1000)
+ result1 = sql """show column cached stats $table"""
+ result2 = sql """show column stats $table"""
+ }
+ assertTrue(dropped)
+ }
+
String db = "test_analyze"
String tbl = "analyzetestlimited_duplicate_all"
@@ -1152,6 +1168,8 @@ PARTITION `p599` VALUES IN (599)
ALTER TABLE analyze_test_with_schema_update ADD COLUMN tbl_name
VARCHAR(256) DEFAULT NULL;
"""
+ stats_dropped("analyze_test_with_schema_update")
+
sql """
ANALYZE TABLE analyze_test_with_schema_update WITH SYNC
"""
@@ -1349,6 +1367,7 @@ PARTITION `p599` VALUES IN (599)
def result_before_truncate = sql """show column stats ${tbl}"""
assertEquals(14, result_before_truncate.size())
sql """TRUNCATE TABLE ${tbl}"""
+ stats_dropped(tbl)
def result_after_truncate = sql """show column stats ${tbl}"""
assertEquals(0, result_after_truncate.size())
result_after_truncate = sql """show column cached stats ${tbl}"""
@@ -1375,6 +1394,7 @@ PARTITION `p599` VALUES IN (599)
assert
"111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111
[...]
sql """TRUNCATE TABLE ${tbl}"""
+ stats_dropped(tbl)
result_after_truncate = sql """show column stats ${tbl}"""
assertEquals(0, result_after_truncate.size())
sql """ANALYZE TABLE ${tbl} WITH SYNC"""
diff --git a/regression-test/suites/statistics/test_analyze_mv.groovy
b/regression-test/suites/statistics/test_analyze_mv.groovy
index 3a8f7335375..daa8d780aee 100644
--- a/regression-test/suites/statistics/test_analyze_mv.groovy
+++ b/regression-test/suites/statistics/test_analyze_mv.groovy
@@ -108,6 +108,22 @@ suite("test_analyze_mv") {
assertTrue(found)
}
+ def stats_dropped = { table ->
+ def result1 = sql """show column cached stats $table"""
+ def result2 = sql """show column stats $table"""
+ boolean dropped = false
+ for (int i = 0; i < 120; i++) {
+ if (0 == result1.size() && 0 == result2.size()) {
+ dropped = true;
+ break;
+ }
+ Thread.sleep(1000)
+ result1 = sql """show column cached stats $table"""
+ result2 = sql """show column stats $table"""
+ }
+ assertTrue(dropped)
+ }
+
sql """drop database if exists test_analyze_mv"""
sql """create database test_analyze_mv"""
sql """use test_analyze_mv"""
@@ -674,6 +690,7 @@ suite("test_analyze_mv") {
// * Test row count report and report for nereids
sql """truncate table mvTestDup"""
result_row = sql """show index stats mvTestDup mv3"""
+ stats_dropped("mvTestDup")
assertEquals(1, result_row.size())
assertEquals("mvTestDup", result_row[0][0])
assertEquals("mv3", result_row[0][1])
diff --git
a/regression-test/suites/statistics/test_drop_stats_and_truncate.groovy
b/regression-test/suites/statistics/test_drop_stats_and_truncate.groovy
index 6dc3c6d1797..31c1e423900 100644
--- a/regression-test/suites/statistics/test_drop_stats_and_truncate.groovy
+++ b/regression-test/suites/statistics/test_drop_stats_and_truncate.groovy
@@ -17,6 +17,22 @@
suite("test_drop_stats_and_truncate") {
+ def stats_dropped = { table ->
+ def result1 = sql """show column cached stats $table"""
+ def result2 = sql """show column stats $table"""
+ boolean dropped = false
+ for (int i = 0; i < 120; i++) {
+ if (0 == result1.size() && 0 == result2.size()) {
+ dropped = true;
+ break;
+ }
+ Thread.sleep(1000)
+ result1 = sql """show column cached stats $table"""
+ result2 = sql """show column stats $table"""
+ }
+ assertTrue(dropped)
+ }
+
sql """drop database if exists test_drop_stats_and_truncate"""
sql """create database test_drop_stats_and_truncate"""
sql """use test_drop_stats_and_truncate"""
@@ -101,6 +117,7 @@ suite("test_drop_stats_and_truncate") {
assertEquals(3, columns.size())
sql """truncate table non_part"""
+ stats_dropped("non_part")
result = sql """show column stats non_part"""
assertEquals(0, result.size())
result = sql """show table stats non_part"""
@@ -148,6 +165,7 @@ suite("test_drop_stats_and_truncate") {
assertEquals(9, columns.size())
sql """truncate table part"""
+ stats_dropped("part")
result = sql """show column stats part"""
assertEquals(0, result.size())
result = sql """show table stats part"""
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]