This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 2e4aec48dea branch-3.1: [fix](Export) Enhance `removeOldExportJobs`
Logic #47604 (#51935)
2e4aec48dea is described below
commit 2e4aec48deaa1f0a4fece2a2487ac8c2f9e8f4c2
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Thu Jun 19 18:16:41 2025 +0800
branch-3.1: [fix](Export) Enhance `removeOldExportJobs` Logic #47604
(#51935)
bp #47604
Co-authored-by: Tiewei Fang <[email protected]>
---
.../main/java/org/apache/doris/common/Config.java | 6 +++
.../main/java/org/apache/doris/catalog/Env.java | 11 ++++++
.../main/java/org/apache/doris/load/ExportMgr.java | 18 +++++++++
.../apache/doris/load/loadv2/ExportMgrTest.java | 46 ++++++++++++++++++++++
4 files changed, 81 insertions(+)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 6f2d8d06de3..6925a2d59c1 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -198,6 +198,12 @@ public class Config extends ConfigBase {
"For ALTER, EXPORT jobs, remove the finished job if expired."})
public static int history_job_keep_max_second = 7 * 24 * 3600; // 7 days
+ @ConfField(mutable = true, masterOnly = true, description = {
+ "针对 EXPORT 作业,如果系统内 EXPORT 作业数量超过这个值,则会删除最老的记录。",
+ "For EXPORT jobs, If the number of EXPORT jobs in the system
exceeds this value, "
+ + "the oldest records will be deleted."})
+ public static int max_export_history_job_num = 1000;
+
@ConfField(description = {"事务的清理周期,单位为秒。每个周期内,将会清理已经结束的并且过期的历史事务信息",
"The clean interval of transaction, in seconds. "
+ "In each cycle, the expired history transaction will be
cleaned"})
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index eeb86251613..90fbb1fad34 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -164,6 +164,7 @@ import org.apache.doris.journal.JournalEntity;
import org.apache.doris.journal.bdbje.Timestamp;
import org.apache.doris.load.DeleteHandler;
import org.apache.doris.load.ExportJob;
+import org.apache.doris.load.ExportJobState;
import org.apache.doris.load.ExportMgr;
import org.apache.doris.load.GroupCommitManager;
import org.apache.doris.load.Load;
@@ -2540,6 +2541,16 @@ public class Env {
long curTime = System.currentTimeMillis();
List<ExportJob> jobs = exportMgr.getJobs().stream().filter(t ->
!t.isExpired(curTime))
.collect(Collectors.toList());
+ if (jobs.size() > Config.max_export_history_job_num) {
+ jobs.sort(Comparator.comparingLong(ExportJob::getCreateTimeMs));
+ Iterator<ExportJob> iterator = jobs.iterator();
+ while (jobs.size() > Config.max_export_history_job_num &&
iterator.hasNext()) {
+ ExportJob job = iterator.next();
+ if (job.getState() == ExportJobState.FINISHED ||
job.getState() == ExportJobState.CANCELLED) {
+ iterator.remove();
+ }
+ }
+ }
int size = jobs.size();
checksum ^= size;
dos.writeInt(size);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
index b798c053186..89baa252d46 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
@@ -52,6 +52,7 @@ import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@@ -462,6 +463,23 @@ public class ExportMgr {
}
}
}
+
+ if (exportIdToJob.size() > Config.max_export_history_job_num) {
+ List<Map.Entry<Long, ExportJob>> jobList = new
ArrayList<>(exportIdToJob.entrySet());
+ jobList.sort(Comparator.comparingLong(entry ->
entry.getValue().getCreateTimeMs()));
+ while (exportIdToJob.size() >
Config.max_export_history_job_num) {
+ // Remove the oldest job
+ Map.Entry<Long, ExportJob> oldestEntry = jobList.remove(0);
+ exportIdToJob.remove(oldestEntry.getKey());
+ Map<String, Long> labelJobs =
dbTolabelToExportJobId.get(oldestEntry.getValue().getDbId());
+ if (labelJobs != null) {
+ labelJobs.remove(oldestEntry.getValue().getLabel());
+ if (labelJobs.isEmpty()) {
+
dbTolabelToExportJobId.remove(oldestEntry.getValue().getDbId());
+ }
+ }
+ }
+ }
} finally {
writeUnlock();
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/ExportMgrTest.java
b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/ExportMgrTest.java
index 448e7608a7b..38549537fbc 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/ExportMgrTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/ExportMgrTest.java
@@ -19,9 +19,11 @@ package org.apache.doris.load.loadv2;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.TableName;
+import org.apache.doris.common.Config;
import org.apache.doris.common.jmockit.Deencapsulation;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.load.ExportJob;
+import org.apache.doris.load.ExportJobState;
import org.apache.doris.load.ExportMgr;
import org.apache.doris.mysql.privilege.AccessControllerManager;
import org.apache.doris.mysql.privilege.MockedAuth;
@@ -31,6 +33,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
@@ -76,6 +79,49 @@ public class ExportMgrTest {
}
+ @Test
+ public void testRemoveOldExportJobs() {
+ // Setup: Create jobs with different creation times
+ long currentTime = System.currentTimeMillis();
+ for (int i = 1; i <= 10; i++) {
+ ExportJob job = makeExportJob(i, "label" + i);
+ // Jobs created 1, 2...10 days ago
+ Deencapsulation.setField(job, "createTimeMs", currentTime - (i *
24 * 3600 * 1000));
+ Deencapsulation.setField(job, "state", ExportJobState.FINISHED);
+ exportMgr.unprotectAddJob(job);
+ }
+
+ // Invoke the method
+ exportMgr.removeOldExportJobs();
+
+ // Assertions: Check the number of jobs remaining
+ List<ExportJob> remainingJobs = exportMgr.getJobs();
+ Assert.assertTrue(remainingJobs.size() <=
Config.history_job_keep_max_second);
+ Assert.assertEquals(7, remainingJobs.size()); // Expecting 8 jobs to
remain
+
+
+ for (int i = 11; i <= 1010; i++) {
+ ExportJob job = makeExportJob(i, "label" + i);
+ // Jobs created 0, 1, 2, 3, 4...1000 seconds ago
+ Deencapsulation.setField(job, "createTimeMs", currentTime - (i *
1000));
+ Deencapsulation.setField(job, "state", ExportJobState.FINISHED);
+ exportMgr.unprotectAddJob(job);
+ }
+
+ // Invoke the method
+ exportMgr.removeOldExportJobs();
+ // Assertions: Check the number of jobs remaining
+ remainingJobs = exportMgr.getJobs();
+ Assert.assertTrue(remainingJobs.size() <=
Config.history_job_keep_max_second);
+ Assert.assertEquals(1000, remainingJobs.size()); // Expecting 1000
jobs to remain
+
+ // check the created time
+ remainingJobs.sort(Comparator.comparingLong(entry ->
entry.getCreateTimeMs()));
+ for (int i = 0; i < remainingJobs.size(); ++i) {
+ Assert.assertEquals(1010 - i, remainingJobs.get(i).getId());
+ }
+ }
+
private ExportJob makeExportJob(long id, String label) {
ExportJob job1 = new ExportJob(id);
Deencapsulation.setField(job1, "label", label);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]