This is an automated email from the ASF dual-hosted git repository.
englefly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new cfb6af295f1 [fix](stats) Fix creating too many tasks on new env #27364
cfb6af295f1 is described below
commit cfb6af295f1c1f0e39838efa9e67d9470717a14c
Author: AKIRA <[email protected]>
AuthorDate: Wed Nov 22 17:53:31 2023 +0900
[fix](stats) Fix creating too many tasks on new env #27364
If there exists huge datasets with many database and may tables and many
columns, Auto collector might be submit too many jobs which would occupy too
much of FE memory.
In this PR, limit job each round could submit up to 5
---
.../org/apache/doris/statistics/AnalysisJob.java | 33 +++++++++++-----------
.../apache/doris/statistics/AnalysisManager.java | 4 +++
.../doris/statistics/StatisticConstants.java | 2 ++
.../doris/statistics/StatisticsAutoCollector.java | 2 +-
.../doris/statistics/StatisticsCollector.java | 9 ++++--
.../apache/doris/statistics/AnalysisJobTest.java | 8 +++---
6 files changed, 34 insertions(+), 24 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java
index acea77b1b72..41e4b6b317e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java
@@ -30,6 +30,7 @@ import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -45,10 +46,6 @@ public class AnalysisJob {
protected List<ColStatsData> buf;
- protected int totalTaskCount;
-
- protected int queryFinishedTaskCount;
-
protected StmtExecutor stmtExecutor;
protected boolean killed;
@@ -63,10 +60,9 @@ public class AnalysisJob {
for (BaseAnalysisTask task : queryingTask) {
task.job = this;
}
- this.queryingTask = new HashSet<>(queryingTask);
- this.queryFinished = new HashSet<>();
+ this.queryingTask = Collections.synchronizedSet(new
HashSet<>(queryingTask));
+ this.queryFinished = Collections.synchronizedSet(new HashSet<>());
this.buf = new ArrayList<>();
- totalTaskCount = queryingTask.size();
start = System.currentTimeMillis();
this.jobInfo = jobInfo;
this.analysisManager = Env.getCurrentEnv().getAnalysisManager();
@@ -86,12 +82,14 @@ public class AnalysisJob {
}
protected void markOneTaskDone() {
- queryFinishedTaskCount += 1;
- if (queryFinishedTaskCount == totalTaskCount) {
- writeBuf();
- updateTaskState(AnalysisState.FINISHED, "Cost time in sec: "
- + (System.currentTimeMillis() - start) / 1000);
- deregisterJob();
+ if (queryingTask.isEmpty()) {
+ try {
+ writeBuf();
+ updateTaskState(AnalysisState.FINISHED, "Cost time in sec: "
+ + (System.currentTimeMillis() - start) / 1000);
+ } finally {
+ deregisterJob();
+ }
} else if (buf.size() >= StatisticsUtil.getInsertMergeCount()) {
writeBuf();
}
@@ -175,9 +173,12 @@ public class AnalysisJob {
}
public void taskFailed(BaseAnalysisTask task, String reason) {
- updateTaskState(AnalysisState.FAILED, reason);
- cancel();
- deregisterJob();
+ try {
+ updateTaskState(AnalysisState.FAILED, reason);
+ cancel();
+ } finally {
+ deregisterJob();
+ }
}
public void cancel() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index 24c612e101e..8badad39d0f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -1071,4 +1071,8 @@ public class AnalysisManager implements Writable {
public void removeJob(long id) {
idToAnalysisJob.remove(id);
}
+
+ public boolean hasUnFinished() {
+ return !analysisJobIdToTaskMap.isEmpty();
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java
index e6f71cd5911..ee07d52d3b2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java
@@ -94,6 +94,8 @@ public class StatisticConstants {
public static final int ANALYZE_TIMEOUT_IN_SEC = 43200;
+ public static final int SUBMIT_JOB_LIMIT = 5;
+
static {
SYSTEM_DBS.add(SystemInfoService.DEFAULT_CLUSTER
+ ClusterNamespace.CLUSTER_DELIMITER +
FeConstants.INTERNAL_DB_NAME);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
index 08487ff4987..80afcb2c0f1 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
@@ -50,7 +50,7 @@ public class StatisticsAutoCollector extends
StatisticsCollector {
public StatisticsAutoCollector() {
super("Automatic Analyzer",
-
TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes),
+
TimeUnit.MINUTES.toMillis(Config.full_auto_analyze_simultaneously_running_task_num),
new
AnalysisTaskExecutor(Config.full_auto_analyze_simultaneously_running_task_num));
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java
index 638db553987..4c77d42cfe0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java
@@ -35,6 +35,7 @@ public abstract class StatisticsCollector extends
MasterDaemon {
protected final AnalysisTaskExecutor analysisTaskExecutor;
+ protected int submittedThisRound = StatisticConstants.SUBMIT_JOB_LIMIT;
public StatisticsCollector(String name, long intervalMs,
AnalysisTaskExecutor analysisTaskExecutor) {
super(name, intervalMs);
@@ -54,8 +55,8 @@ public abstract class StatisticsCollector extends
MasterDaemon {
if (Env.isCheckpointThread()) {
return;
}
-
- if (!analysisTaskExecutor.idle()) {
+ submittedThisRound = StatisticConstants.SUBMIT_JOB_LIMIT;
+ if (Env.getCurrentEnv().getAnalysisManager().hasUnFinished()) {
LOG.info("Analyze tasks those submitted in last time is not
finished, skip");
return;
}
@@ -72,7 +73,9 @@ public abstract class StatisticsCollector extends
MasterDaemon {
// No statistics need to be collected or updated
return;
}
-
+ if (submittedThisRound-- < 0) {
+ return;
+ }
Map<Long, BaseAnalysisTask> analysisTasks = new HashMap<>();
AnalysisManager analysisManager =
Env.getCurrentEnv().getAnalysisManager();
analysisManager.createTaskForEachColumns(jobInfo, analysisTasks,
false);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java
b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java
index d4dedd17123..bca05d8299c 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java
@@ -44,7 +44,9 @@ public class AnalysisJobTest {
}
@Test
- public void testAppendBufTest1(@Mocked AnalysisInfo analysisInfo, @Mocked
OlapAnalysisTask olapAnalysisTask) {
+ public void testAppendBufTest1(@Mocked AnalysisInfo analysisInfo,
+ @Mocked OlapAnalysisTask olapAnalysisTask,
+ @Mocked OlapAnalysisTask olapAnalysisTask2) {
AtomicInteger writeBufInvokeTimes = new AtomicInteger();
new MockUp<AnalysisJob>() {
@Mock
@@ -63,9 +65,9 @@ public class AnalysisJobTest {
AnalysisJob job = new AnalysisJob(analysisInfo,
Arrays.asList(olapAnalysisTask));
job.queryingTask = new HashSet<>();
job.queryingTask.add(olapAnalysisTask);
+ job.queryingTask.add(olapAnalysisTask2);
job.queryFinished = new HashSet<>();
job.buf = new ArrayList<>();
- job.totalTaskCount = 20;
// not all task finished nor cached limit exceed, shouldn't write
job.appendBuf(olapAnalysisTask, Arrays.asList(new ColStatsData()));
@@ -97,7 +99,6 @@ public class AnalysisJobTest {
job.queryingTask.add(olapAnalysisTask);
job.queryFinished = new HashSet<>();
job.buf = new ArrayList<>();
- job.totalTaskCount = 1;
job.appendBuf(olapAnalysisTask, Arrays.asList(new ColStatsData()));
// all task finished, should write and deregister this job
@@ -132,7 +133,6 @@ public class AnalysisJobTest {
for (int i = 0; i < StatisticsUtil.getInsertMergeCount(); i++) {
job.buf.add(colStatsData);
}
- job.totalTaskCount = 100;
job.appendBuf(olapAnalysisTask, Arrays.asList(new ColStatsData()));
// cache limit exceed, should write them
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]