This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 2e4aec48dea branch-3.1: [fix](Export) Enhance `removeOldExportJobs` 
Logic #47604 (#51935)
2e4aec48dea is described below

commit 2e4aec48deaa1f0a4fece2a2487ac8c2f9e8f4c2
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Thu Jun 19 18:16:41 2025 +0800

    branch-3.1: [fix](Export) Enhance `removeOldExportJobs` Logic #47604 
(#51935)
    
    bp #47604
    
    Co-authored-by: Tiewei Fang <[email protected]>
---
 .../main/java/org/apache/doris/common/Config.java  |  6 +++
 .../main/java/org/apache/doris/catalog/Env.java    | 11 ++++++
 .../main/java/org/apache/doris/load/ExportMgr.java | 18 +++++++++
 .../apache/doris/load/loadv2/ExportMgrTest.java    | 46 ++++++++++++++++++++++
 4 files changed, 81 insertions(+)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 6f2d8d06de3..6925a2d59c1 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -198,6 +198,12 @@ public class Config extends ConfigBase {
             "For ALTER, EXPORT jobs, remove the finished job if expired."})
     public static int history_job_keep_max_second = 7 * 24 * 3600; // 7 days
 
+    @ConfField(mutable = true, masterOnly = true, description = {
+            "针对 EXPORT 作业,如果系统内 EXPORT 作业数量超过这个值,则会删除最老的记录。",
+            "For EXPORT jobs, If the number of EXPORT jobs in the system 
exceeds this value, "
+                    + "the oldest records will be deleted."})
+    public static int max_export_history_job_num = 1000;
+
     @ConfField(description = {"事务的清理周期,单位为秒。每个周期内,将会清理已经结束的并且过期的历史事务信息",
             "The clean interval of transaction, in seconds. "
                     + "In each cycle, the expired history transaction will be 
cleaned"})
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index eeb86251613..90fbb1fad34 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -164,6 +164,7 @@ import org.apache.doris.journal.JournalEntity;
 import org.apache.doris.journal.bdbje.Timestamp;
 import org.apache.doris.load.DeleteHandler;
 import org.apache.doris.load.ExportJob;
+import org.apache.doris.load.ExportJobState;
 import org.apache.doris.load.ExportMgr;
 import org.apache.doris.load.GroupCommitManager;
 import org.apache.doris.load.Load;
@@ -2540,6 +2541,16 @@ public class Env {
         long curTime = System.currentTimeMillis();
         List<ExportJob> jobs = exportMgr.getJobs().stream().filter(t -> 
!t.isExpired(curTime))
                 .collect(Collectors.toList());
+        if (jobs.size() > Config.max_export_history_job_num) {
+            jobs.sort(Comparator.comparingLong(ExportJob::getCreateTimeMs));
+            Iterator<ExportJob> iterator = jobs.iterator();
+            while (jobs.size() > Config.max_export_history_job_num && 
iterator.hasNext()) {
+                ExportJob job = iterator.next();
+                if (job.getState() == ExportJobState.FINISHED || 
job.getState() == ExportJobState.CANCELLED) {
+                    iterator.remove();
+                }
+            }
+        }
         int size = jobs.size();
         checksum ^= size;
         dos.writeInt(size);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
index b798c053186..89baa252d46 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
@@ -52,6 +52,7 @@ import org.apache.logging.log4j.Logger;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -462,6 +463,23 @@ public class ExportMgr {
                     }
                 }
             }
+
+            if (exportIdToJob.size() > Config.max_export_history_job_num) {
+                List<Map.Entry<Long, ExportJob>> jobList = new 
ArrayList<>(exportIdToJob.entrySet());
+                jobList.sort(Comparator.comparingLong(entry -> 
entry.getValue().getCreateTimeMs()));
+                while (exportIdToJob.size() > 
Config.max_export_history_job_num) {
+                    // Remove the oldest job
+                    Map.Entry<Long, ExportJob> oldestEntry = jobList.remove(0);
+                    exportIdToJob.remove(oldestEntry.getKey());
+                    Map<String, Long> labelJobs = 
dbTolabelToExportJobId.get(oldestEntry.getValue().getDbId());
+                    if (labelJobs != null) {
+                        labelJobs.remove(oldestEntry.getValue().getLabel());
+                        if (labelJobs.isEmpty()) {
+                            
dbTolabelToExportJobId.remove(oldestEntry.getValue().getDbId());
+                        }
+                    }
+                }
+            }
         } finally {
             writeUnlock();
         }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/ExportMgrTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/ExportMgrTest.java
index 448e7608a7b..38549537fbc 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/ExportMgrTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/ExportMgrTest.java
@@ -19,9 +19,11 @@ package org.apache.doris.load.loadv2;
 
 import org.apache.doris.analysis.BrokerDesc;
 import org.apache.doris.analysis.TableName;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.jmockit.Deencapsulation;
 import org.apache.doris.datasource.InternalCatalog;
 import org.apache.doris.load.ExportJob;
+import org.apache.doris.load.ExportJobState;
 import org.apache.doris.load.ExportMgr;
 import org.apache.doris.mysql.privilege.AccessControllerManager;
 import org.apache.doris.mysql.privilege.MockedAuth;
@@ -31,6 +33,7 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 
@@ -76,6 +79,49 @@ public class ExportMgrTest {
 
     }
 
+    @Test
+    public void testRemoveOldExportJobs() {
+        // Setup: Create jobs with different creation times
+        long currentTime = System.currentTimeMillis();
+        for (int i = 1; i <= 10; i++) {
+            ExportJob job = makeExportJob(i, "label" + i);
+            // Jobs created 1, 2...10 days ago
+            Deencapsulation.setField(job, "createTimeMs", currentTime - (i * 
24 * 3600 * 1000));
+            Deencapsulation.setField(job, "state", ExportJobState.FINISHED);
+            exportMgr.unprotectAddJob(job);
+        }
+
+        // Invoke the method
+        exportMgr.removeOldExportJobs();
+
+        // Assertions: Check the number of jobs remaining
+        List<ExportJob> remainingJobs = exportMgr.getJobs();
+        Assert.assertTrue(remainingJobs.size() <= 
Config.history_job_keep_max_second);
+        Assert.assertEquals(7, remainingJobs.size()); // Expecting 8 jobs to 
remain
+
+
+        for (int i = 11; i <= 1010; i++) {
+            ExportJob job = makeExportJob(i, "label" + i);
+            // Jobs created 0, 1, 2, 3, 4...1000 seconds ago
+            Deencapsulation.setField(job, "createTimeMs", currentTime - (i * 
1000));
+            Deencapsulation.setField(job, "state", ExportJobState.FINISHED);
+            exportMgr.unprotectAddJob(job);
+        }
+
+        // Invoke the method
+        exportMgr.removeOldExportJobs();
+        // Assertions: Check the number of jobs remaining
+        remainingJobs = exportMgr.getJobs();
+        Assert.assertTrue(remainingJobs.size() <= 
Config.history_job_keep_max_second);
+        Assert.assertEquals(1000, remainingJobs.size()); // Expecting 1000 
jobs to remain
+
+        // check the created time
+        remainingJobs.sort(Comparator.comparingLong(entry -> 
entry.getCreateTimeMs()));
+        for (int i = 0; i < remainingJobs.size(); ++i) {
+            Assert.assertEquals(1010 - i, remainingJobs.get(i).getId());
+        }
+    }
+
     private ExportJob makeExportJob(long id, String label) {
         ExportJob job1 = new ExportJob(id);
         Deencapsulation.setField(job1, "label", label);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to