This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e027f3d1f2 [enhancement](fe-memory) support label num threshold to 
reduce fe memory consumption   (#22889)
7e027f3d1f2 is described below

commit 7e027f3d1f26ccd1436f9ec96135510200f4ed08
Author: Siyang Tang <82279870+tangsiyang2...@users.noreply.github.com>
AuthorDate: Mon Jan 15 10:30:58 2024 +0800

    [enhancement](fe-memory) support label num threshold to reduce fe memory 
consumption   (#22889)
    
    
    
    Co-authored-by: Yongqiang YANG 
<98214048+dataroar...@users.noreply.github.com>
---
 .../main/java/org/apache/doris/common/Config.java  |  8 +++
 .../main/java/org/apache/doris/catalog/Env.java    |  1 +
 .../org/apache/doris/load/loadv2/LoadManager.java  | 73 ++++++++++++++++------
 .../doris/load/routineload/RoutineLoadJob.java     |  2 +-
 .../doris/load/routineload/RoutineLoadManager.java | 50 +++++++++++++--
 .../load/routineload/RoutineLoadScheduler.java     | 17 ++---
 .../org/apache/doris/load/sync/SyncChecker.java    |  1 +
 .../org/apache/doris/load/sync/SyncJobManager.java | 67 ++++++++++++++++----
 .../doris/transaction/DatabaseTransactionMgr.java  | 27 ++++++--
 .../apache/doris/load/loadv2/LoadManagerTest.java  | 46 +++++++++++++-
 .../load/routineload/RoutineLoadManagerTest.java   | 43 ++++++++++++-
 .../apache/doris/load/sync/SyncJobManagerTest.java | 36 +++++++++++
 .../transaction/DatabaseTransactionMgrTest.java    | 12 +++-
 13 files changed, 331 insertions(+), 52 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 5a3685a2c5e..4d38f041fe5 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2415,6 +2415,14 @@ public class Config extends ConfigBase {
     })
     public static int http_load_submitter_max_worker_threads = 2;
 
+    @ConfField(mutable = true, masterOnly = true, description = {
+            "load label个数阈值,超过该个数后,对于已经完成导入作业或者任务,其label会被删除,被删除的 label 
可以被重用。",
+            "The threshold of load labels' number. After this number is 
exceeded, "
+                    + "the labels of the completed import jobs or tasks will 
be deleted, "
+                    + "and the deleted labels can be reused."
+    })
+    public static int label_num_threshold = 2000;
+
     
//==========================================================================
     //                    begin of cloud config
     
//==========================================================================
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 9e371aadc0c..cc025656976 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -2452,6 +2452,7 @@ public class Env {
                 loadManager.removeOldLoadJob();
                 exportMgr.removeOldExportJobs();
                 deleteHandler.removeOldDeleteInfos();
+                loadManager.removeOverLimitLoadJob();
             }
         };
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
index 35968139ec8..1840494dcb2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
@@ -65,8 +65,11 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Comparator;
+import java.util.Deque;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -77,6 +80,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 /**
@@ -407,29 +411,62 @@ public class LoadManager implements Writable {
      **/
     public void removeOldLoadJob() {
         long currentTimeMs = System.currentTimeMillis();
+        removeLoadJobIf(job -> job.isExpired(currentTimeMs));
+    }
 
+    /**
+     * Remove completed jobs if total job num exceed Config.label_num_threshold
+     */
+    public void removeOverLimitLoadJob() {
+        if (Config.label_num_threshold < 0 || idToLoadJob.size() <= 
Config.label_num_threshold) {
+            return;
+        }
+        writeLock();
+        try {
+            Deque<LoadJob> finishedJobs = idToLoadJob
+                    .values()
+                    .stream()
+                    .filter(LoadJob::isCompleted)
+                    .sorted(Comparator.comparingLong(o -> o.finishTimestamp))
+                    .collect(Collectors.toCollection(ArrayDeque::new));
+            while (!finishedJobs.isEmpty()
+                    && idToLoadJob.size() > Config.label_num_threshold) {
+                LoadJob loadJob = finishedJobs.pollFirst();
+                idToLoadJob.remove(loadJob.getId());
+                jobRemovedTrigger(loadJob);
+            }
+        } finally {
+            writeUnlock();
+        }
+    }
+
+    private void jobRemovedTrigger(LoadJob job) {
+        Map<String, List<LoadJob>> map = 
dbIdToLabelToLoadJobs.get(job.getDbId());
+        List<LoadJob> list = map.get(job.getLabel());
+        list.remove(job);
+        if (job instanceof SparkLoadJob) {
+            ((SparkLoadJob) job).clearSparkLauncherLog();
+        }
+        if (job instanceof BulkLoadJob) {
+            ((BulkLoadJob) job).recycleProgress();
+        }
+        if (list.isEmpty()) {
+            map.remove(job.getLabel());
+        }
+        if (map.isEmpty()) {
+            dbIdToLabelToLoadJobs.remove(job.getDbId());
+        }
+    }
+
+    private void removeLoadJobIf(Predicate<LoadJob> pred) {
         writeLock();
         try {
             Iterator<Map.Entry<Long, LoadJob>> iter = 
idToLoadJob.entrySet().iterator();
             while (iter.hasNext()) {
                 LoadJob job = iter.next().getValue();
-                if (job.isExpired(currentTimeMs)) {
+                if (pred.test(job)) {
                     iter.remove();
-                    Map<String, List<LoadJob>> map = 
dbIdToLabelToLoadJobs.get(job.getDbId());
-                    List<LoadJob> list = map.get(job.getLabel());
-                    list.remove(job);
-                    if (job instanceof SparkLoadJob) {
-                        ((SparkLoadJob) job).clearSparkLauncherLog();
-                    }
-                    if (job instanceof BulkLoadJob) {
-                        ((BulkLoadJob) job).recycleProgress();
-                    }
-                    if (list.isEmpty()) {
-                        map.remove(job.getLabel());
-                    }
-                    if (map.isEmpty()) {
-                        dbIdToLabelToLoadJobs.remove(job.getDbId());
-                    }
+                    jobRemovedTrigger(job);
                 }
             }
         } finally {
@@ -516,8 +553,8 @@ public class LoadManager implements Writable {
      * @param accurateMatch true: filter jobs which's label is labelValue. 
false: filter jobs which's label like itself.
      * @param statesValue   used to filter jobs which's state within the 
statesValue set.
      * @return The result is the list of jobInfo.
-     * JobInfo is a list which includes the comparable object: jobId, label, 
state etc.
-     * The result is unordered.
+     *         JobInfo is a list which includes the comparable object: jobId, 
label, state etc.
+     *         The result is unordered.
      */
     public List<List<Comparable>> getLoadJobInfosByDb(long dbId, String 
labelValue, boolean accurateMatch,
                                                       Set<String> statesValue) 
throws AnalysisException {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 1ce3c1e8c0c..0a947701ef5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -1670,7 +1670,7 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
 
     abstract Map<String, String> getCustomProperties();
 
-    public boolean needRemove() {
+    public boolean isExpired() {
         if (!isFinal()) {
             return false;
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
index 5fb400d9d83..dde2e31bb9a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
@@ -62,7 +62,10 @@ import org.apache.logging.log4j.Logger;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Deque;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -70,6 +73,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 public class RoutineLoadManager implements Writable {
@@ -251,7 +255,7 @@ public class RoutineLoadManager implements Writable {
             if 
(!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(),
                     dbFullName,
                     PrivPredicate.LOAD)) {
-                //todo add new error code
+                // todo add new error code
                 
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, 
"LOAD",
                         ConnectContext.get().getQualifiedUser(),
                         ConnectContext.get().getRemoteIP(),
@@ -579,7 +583,7 @@ public class RoutineLoadManager implements Writable {
       else return all of result
      */
     public List<RoutineLoadJob> getJob(String dbFullName, String jobName,
-                                       boolean includeHistory, PatternMatcher 
matcher)
+            boolean includeHistory, PatternMatcher matcher)
             throws MetaNotFoundException {
         Preconditions.checkArgument(jobName == null || matcher == null,
                 "jobName and matcher cannot be not null at the same time");
@@ -683,19 +687,55 @@ public class RoutineLoadManager implements Writable {
 
     // Remove old routine load jobs from idToRoutineLoadJob
     // This function is called periodically.
-    // Cancelled and stopped job will be remove after 
Configure.label_keep_max_second seconds
+    // Cancelled and stopped job will be removed after 
Configure.label_keep_max_second seconds
     public void cleanOldRoutineLoadJobs() {
         LOG.debug("begin to clean old routine load jobs ");
+        clearRoutineLoadJobIf(RoutineLoadJob::isExpired);
+    }
+
+    /**
+     * Remove finished routine load jobs from idToRoutineLoadJob
+     * This function is called periodically if Config.label_num_threshold is 
set.
+     * Cancelled and stopped job will be removed.
+     */
+    public void cleanOverLimitRoutineLoadJobs() {
+        if (Config.label_num_threshold < 0
+                || idToRoutineLoadJob.size() <= Config.label_num_threshold) {
+            return;
+        }
+        writeLock();
+        try {
+            LOG.debug("begin to clean routine load jobs");
+            Deque<RoutineLoadJob> finishedJobs = idToRoutineLoadJob
+                    .values()
+                    .stream()
+                    .filter(RoutineLoadJob::isFinal)
+                    .sorted(Comparator.comparingLong(o -> o.endTimestamp))
+                    .collect(Collectors.toCollection(ArrayDeque::new));
+            while (!finishedJobs.isEmpty()
+                    && idToRoutineLoadJob.size() > Config.label_num_threshold) 
{
+                RoutineLoadJob routineLoadJob = finishedJobs.pollFirst();
+                unprotectedRemoveJobFromDb(routineLoadJob);
+                idToRoutineLoadJob.remove(routineLoadJob.getId());
+                RoutineLoadOperation operation = new 
RoutineLoadOperation(routineLoadJob.getId(),
+                        routineLoadJob.getState());
+                
Env.getCurrentEnv().getEditLog().logRemoveRoutineLoadJob(operation);
+            }
+        } finally {
+            writeUnlock();
+        }
+    }
+
+    private void clearRoutineLoadJobIf(Predicate<RoutineLoadJob> pred) {
         writeLock();
         try {
             Iterator<Map.Entry<Long, RoutineLoadJob>> iterator = 
idToRoutineLoadJob.entrySet().iterator();
             long currentTimestamp = System.currentTimeMillis();
             while (iterator.hasNext()) {
                 RoutineLoadJob routineLoadJob = iterator.next().getValue();
-                if (routineLoadJob.needRemove()) {
+                if (pred.test(routineLoadJob)) {
                     unprotectedRemoveJobFromDb(routineLoadJob);
                     iterator.remove();
-
                     RoutineLoadOperation operation = new 
RoutineLoadOperation(routineLoadJob.getId(),
                             routineLoadJob.getState());
                     
Env.getCurrentEnv().getEditLog().logRemoveRoutineLoadJob(operation);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java
index b3b2c8baf2f..84f9548de13 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java
@@ -83,9 +83,9 @@ public class RoutineLoadScheduler extends MasterDaemon {
                 if (desiredConcurrentTaskNum <= 0) {
                     // the job will be rescheduled later.
                     LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, 
routineLoadJob.getId())
-                                     .add("msg", "the current concurrent num 
is less than or equal to zero, "
-                                             + "job will be rescheduled later")
-                                     .build());
+                            .add("msg", "the current concurrent num is less 
than or equal to zero, "
+                                    + "job will be rescheduled later")
+                            .build());
                     continue;
                 }
                 // check state and divide job into tasks
@@ -112,10 +112,10 @@ public class RoutineLoadScheduler extends MasterDaemon {
                     routineLoadJob.updateState(errorJobState, reason, false);
                 } catch (UserException e) {
                     LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, 
routineLoadJob.getId())
-                                     .add("current_state", 
routineLoadJob.getState())
-                                     .add("desired_state", errorJobState)
-                                     .add("warn_msg", "failed to change state 
to desired state")
-                                     .build(), e);
+                            .add("current_state", routineLoadJob.getState())
+                            .add("desired_state", errorJobState)
+                            .add("warn_msg", "failed to change state to 
desired state")
+                            .build(), e);
                 }
             }
         }
@@ -124,11 +124,12 @@ public class RoutineLoadScheduler extends MasterDaemon {
         routineLoadManager.processTimeoutTasks();
 
         routineLoadManager.cleanOldRoutineLoadJobs();
+
+        routineLoadManager.cleanOverLimitRoutineLoadJobs();
     }
 
     private List<RoutineLoadJob> getNeedScheduleRoutineJobs() throws 
LoadException {
         return 
routineLoadManager.getRoutineLoadJobByState(Sets.newHashSet(RoutineLoadJob.JobState.NEED_SCHEDULE));
     }
 
-
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChecker.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChecker.java
index c0fa55ac6ec..319589c9aa1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChecker.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChecker.java
@@ -79,5 +79,6 @@ public class SyncChecker extends MasterDaemon {
     private void cleanOldSyncJobs() {
         // clean up expired sync jobs
         this.syncJobManager.cleanOldSyncJobs();
+        this.syncJobManager.cleanOverLimitSyncJobs();
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJobManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJobManager.java
index c1533843ef9..5be922d6e2c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJobManager.java
@@ -23,6 +23,7 @@ import org.apache.doris.analysis.ResumeSyncJobStmt;
 import org.apache.doris.analysis.StopSyncJobStmt;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.io.Writable;
@@ -39,12 +40,17 @@ import org.apache.logging.log4j.Logger;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.ArrayDeque;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Deque;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 public class SyncJobManager implements Writable {
@@ -58,7 +64,7 @@ public class SyncJobManager implements Writable {
 
     public SyncJobManager() {
         idToSyncJob = Maps.newConcurrentMap();
-        dbIdToJobNameToSyncJobs = Maps.newConcurrentMap();
+        dbIdToJobNameToSyncJobs = 
Collections.synchronizedMap(Maps.newLinkedHashMap());
         lock = new ReentrantReadWriteLock(true);
     }
 
@@ -283,31 +289,68 @@ public class SyncJobManager implements Writable {
     // Stopped jobs will be removed after Config.label_keep_max_second.
     public void cleanOldSyncJobs() {
         LOG.debug("begin to clean old sync jobs ");
+        cleanFinishedSyncJobsIf(job -> 
job.isExpired(System.currentTimeMillis()));
+    }
+
+    /**
+     * Remove completed jobs if total job num exceed Config.label_num_threshold
+     */
+    public void cleanOverLimitSyncJobs() {
+        if (Config.label_num_threshold < 0 || idToSyncJob.size() <= 
Config.label_num_threshold) {
+            return;
+        }
+        writeLock();
+        try {
+            LOG.debug("begin to clean finished sync jobs ");
+            Deque<SyncJob> finishedJobs = idToSyncJob
+                    .values()
+                    .stream()
+                    .filter(SyncJob::isCompleted)
+                    .sorted(Comparator.comparingLong(o -> o.finishTimeMs))
+                    .collect(Collectors.toCollection(ArrayDeque::new));
+            while (!finishedJobs.isEmpty() && idToSyncJob.size() > 
Config.label_num_threshold) {
+                SyncJob syncJob = finishedJobs.pollFirst();
+                if (!dbIdToJobNameToSyncJobs.containsKey(syncJob.getDbId())) {
+                    continue;
+                }
+                idToSyncJob.remove(syncJob.getId());
+                jobRemovedTrigger(syncJob);
+            }
+        } finally {
+            writeUnlock();
+        }
+    }
+
+    private void jobRemovedTrigger(SyncJob syncJob) {
+        Map<String, List<SyncJob>> map = 
dbIdToJobNameToSyncJobs.get(syncJob.getDbId());
+        List<SyncJob> list = map.get(syncJob.getJobName());
+        list.remove(syncJob);
+        if (list.isEmpty()) {
+            map.remove(syncJob.getJobName());
+        }
+        if (map.isEmpty()) {
+            dbIdToJobNameToSyncJobs.remove(syncJob.getDbId());
+        }
+    }
+
+    public void cleanFinishedSyncJobsIf(Predicate<SyncJob> pred) {
         long currentTimeMs = System.currentTimeMillis();
         writeLock();
         try {
             Iterator<Map.Entry<Long, SyncJob>> iterator = 
idToSyncJob.entrySet().iterator();
             while (iterator.hasNext()) {
                 SyncJob syncJob = iterator.next().getValue();
-                if (syncJob.isExpired(currentTimeMs)) {
+                if (pred.test(syncJob)) {
                     if 
(!dbIdToJobNameToSyncJobs.containsKey(syncJob.getDbId())) {
                         continue;
                     }
-                    Map<String, List<SyncJob>> map = 
dbIdToJobNameToSyncJobs.get(syncJob.getDbId());
-                    List<SyncJob> list = map.get(syncJob.getJobName());
-                    list.remove(syncJob);
-                    if (list.isEmpty()) {
-                        map.remove(syncJob.getJobName());
-                    }
-                    if (map.isEmpty()) {
-                        dbIdToJobNameToSyncJobs.remove(syncJob.getDbId());
-                    }
+                    jobRemovedTrigger(syncJob);
                     iterator.remove();
                     LOG.info(new LogBuilder(LogKey.SYNC_JOB, syncJob.getId())
                             .add("finishTimeMs", syncJob.getFinishTimeMs())
                             .add("currentTimeMs", currentTimeMs)
                             .add("jobState", syncJob.getJobState())
-                            .add("msg", "old sync job has been cleaned")
+                            .add("msg", "sync job has been cleaned")
                     );
                 }
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index 57088cbd7a8..565df045ccc 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -168,6 +168,8 @@ public class DatabaseTransactionMgr {
 
     private long lockReportingThresholdMs = Config.lock_reporting_threshold_ms;
 
+    private long maxFinalTxnsNum = Long.MAX_VALUE;
+
     private void readLock() {
         this.transactionLock.readLock().lock();
     }
@@ -191,6 +193,9 @@ public class DatabaseTransactionMgr {
         this.env = env;
         this.idGenerator = idGenerator;
         this.editLog = env.getEditLog();
+        if (Config.label_num_threshold >= 0) {
+            this.maxFinalTxnsNum = Config.label_num_threshold;
+        }
     }
 
     public long getDbId() {
@@ -1622,13 +1627,13 @@ public class DatabaseTransactionMgr {
         return partitionInfos;
     }
 
-    public void removeExpiredTxns(long currentMillis) {
+    public void removeUselessTxns(long currentMillis) {
         // delete expired txns
         writeLock();
         try {
-            Pair<Long, Integer> expiredTxnsInfoForShort = 
unprotectedRemoveExpiredTxns(currentMillis,
+            Pair<Long, Integer> expiredTxnsInfoForShort = 
unprotectedRemoveUselessTxns(currentMillis,
                     finalStatusTransactionStateDequeShort, 
MAX_REMOVE_TXN_PER_ROUND);
-            Pair<Long, Integer> expiredTxnsInfoForLong = 
unprotectedRemoveExpiredTxns(currentMillis,
+            Pair<Long, Integer> expiredTxnsInfoForLong = 
unprotectedRemoveUselessTxns(currentMillis,
                     finalStatusTransactionStateDequeLong,
                     MAX_REMOVE_TXN_PER_ROUND - expiredTxnsInfoForShort.second);
             int numOfClearedTransaction = expiredTxnsInfoForShort.second + 
expiredTxnsInfoForLong.second;
@@ -1645,7 +1650,7 @@ public class DatabaseTransactionMgr {
         }
     }
 
-    private Pair<Long, Integer> unprotectedRemoveExpiredTxns(long 
currentMillis,
+    private Pair<Long, Integer> unprotectedRemoveUselessTxns(long 
currentMillis,
             ArrayDeque<TransactionState> finalStatusTransactionStateDeque, int 
left) {
         long latestTxnId = -1;
         int numOfClearedTransaction = 0;
@@ -1660,6 +1665,18 @@ public class DatabaseTransactionMgr {
                 break;
             }
         }
+        while (finalStatusTransactionStateDeque.size() > maxFinalTxnsNum
+                && numOfClearedTransaction < left) {
+            TransactionState transactionState = 
finalStatusTransactionStateDeque.getFirst();
+            if (transactionState.getFinishTime() != -1) {
+                finalStatusTransactionStateDeque.pop();
+                clearTransactionState(transactionState.getTransactionId());
+                latestTxnId = transactionState.getTransactionId();
+                numOfClearedTransaction++;
+            } else {
+                break;
+            }
+        }
         return Pair.of(latestTxnId, numOfClearedTransaction);
     }
 
@@ -2031,7 +2048,7 @@ public class DatabaseTransactionMgr {
     }
 
     public void removeExpiredAndTimeoutTxns(long currentMillis) {
-        removeExpiredTxns(currentMillis);
+        removeUselessTxns(currentMillis);
         List<Long> timeoutTxns = getTimeoutTxns(currentMillis);
         // abort timeout txns
         for (Long txnId : timeoutTxns) {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/LoadManagerTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/LoadManagerTest.java
index a30d25a944f..e9b3278cfd0 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/LoadManagerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/LoadManagerTest.java
@@ -40,6 +40,7 @@ import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
+import java.util.List;
 import java.util.Map;
 
 public class LoadManagerTest {
@@ -123,7 +124,7 @@ public class LoadManagerTest {
         LoadJob job1 = new InsertLoadJob("job1", 1L, 1L, 1L, 
System.currentTimeMillis(), "", "", userInfo);
         Deencapsulation.invoke(loadManager, "addLoadJob", job1);
 
-        //make job1 don't serialize
+        // make job1 don't serialize
         Config.streaming_label_keep_max_second = 1;
         Thread.sleep(2000);
 
@@ -135,6 +136,49 @@ public class LoadManagerTest {
         Assert.assertEquals(0, newLoadJobs.size());
     }
 
+    @Test
+    public void testCleanOverLimitJobs(@Mocked Env env,
+            @Mocked InternalCatalog catalog, @Injectable Database database, 
@Injectable Table table) throws Exception {
+        new Expectations() {
+            {
+                env.getNextId();
+                returns(1L, 2L);
+                env.getInternalCatalog();
+                minTimes = 0;
+                result = catalog;
+                catalog.getDbNullable(anyLong);
+                minTimes = 0;
+                result = database;
+                database.getTableNullable(anyLong);
+                minTimes = 0;
+                result = table;
+                table.getName();
+                minTimes = 0;
+                result = "tablename";
+                Env.getCurrentEnvJournalVersion();
+                minTimes = 0;
+                result = FeMetaVersion.VERSION_CURRENT;
+            }
+        };
+
+        loadManager = new LoadManager(new LoadJobScheduler());
+        LoadJob job1 = new InsertLoadJob("job1", 1L, 1L, 1L, 
System.currentTimeMillis(), "", "", userInfo);
+        Thread.sleep(100);
+        LoadJob job2 = new InsertLoadJob("job2", 1L, 1L, 1L, 
System.currentTimeMillis(), "", "", userInfo);
+        Deencapsulation.invoke(loadManager, "addLoadJob", job2);
+        Deencapsulation.invoke(loadManager, "addLoadJob", job1);
+        Config.label_num_threshold = 1;
+        loadManager.removeOverLimitLoadJob();
+        Map<Long, LoadJob> idToJobs = Deencapsulation.getField(loadManager, 
fieldName);
+        Map<Long, Map<String, List<LoadJob>>> dbIdToLabelToLoadJobs = 
Deencapsulation.getField(loadManager,
+                "dbIdToLabelToLoadJobs");
+        Assert.assertEquals(1, idToJobs.size());
+        Assert.assertEquals(1, dbIdToLabelToLoadJobs.size());
+        LoadJob loadJob = idToJobs.get(job2.getId());
+        Assert.assertEquals("job2", loadJob.getLabel());
+        Assert.assertNotNull(dbIdToLabelToLoadJobs.get(1L).get("job2"));
+    }
+
     private File serializeToFile(LoadManager loadManager) throws Exception {
         File file = new File("./loadManagerTest");
         file.createNewFile();
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
index c2f46ffa53c..156dae72234 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
@@ -801,7 +801,7 @@ public class RoutineLoadManagerTest {
 
         new Expectations() {
             {
-                routineLoadJob.needRemove();
+                routineLoadJob.isExpired();
                 minTimes = 0;
                 result = true;
                 routineLoadJob.getDbId();
@@ -821,6 +821,47 @@ public class RoutineLoadManagerTest {
         Assert.assertEquals(0, idToRoutineLoadJob.size());
     }
 
+    @Test
+    public void testCleanOverLimitRoutineLoadJobs(@Injectable RoutineLoadJob 
routineLoadJob,
+            @Mocked Env env, @Mocked EditLog editLog) {
+        RoutineLoadManager routineLoadManager = new RoutineLoadManager();
+        Map<Long, Map<String, List<RoutineLoadJob>>> dbToNameToRoutineLoadJob 
= Maps.newHashMap();
+        Map<String, List<RoutineLoadJob>> nameToRoutineLoadJob = 
Maps.newHashMap();
+        List<RoutineLoadJob> routineLoadJobList = Lists.newArrayList();
+        routineLoadJobList.add(routineLoadJob);
+        nameToRoutineLoadJob.put("", routineLoadJobList);
+        dbToNameToRoutineLoadJob.put(1L, nameToRoutineLoadJob);
+        Map<Long, RoutineLoadJob> idToRoutineLoadJob = Maps.newHashMap();
+        idToRoutineLoadJob.put(1L, routineLoadJob);
+        Deencapsulation.setField(routineLoadManager, "idToRoutineLoadJob", 
idToRoutineLoadJob);
+        Deencapsulation.setField(routineLoadManager, 
"dbToNameToRoutineLoadJob", dbToNameToRoutineLoadJob);
+
+        new Expectations() {
+            {
+                routineLoadJob.getId();
+                minTimes = 0;
+                result = 1L;
+                routineLoadJob.isFinal();
+                minTimes = 0;
+                result = true;
+                routineLoadJob.getDbId();
+                minTimes = 0;
+                result = 1L;
+                routineLoadJob.getName();
+                minTimes = 0;
+                result = "";
+                env.getEditLog();
+                minTimes = 0;
+                result = editLog;
+            }
+        };
+        Config.label_num_threshold = 0;
+
+        routineLoadManager.cleanOverLimitRoutineLoadJobs();
+        Assert.assertEquals(0, dbToNameToRoutineLoadJob.size());
+        Assert.assertEquals(0, idToRoutineLoadJob.size());
+    }
+
     @Test
     public void testGetBeIdConcurrentTaskMaps(@Injectable RoutineLoadJob 
routineLoadJob) {
         RoutineLoadManager routineLoadManager = new RoutineLoadManager();
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/sync/SyncJobManagerTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/load/sync/SyncJobManagerTest.java
index d0badf19701..dc239dde597 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/load/sync/SyncJobManagerTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/load/sync/SyncJobManagerTest.java
@@ -23,6 +23,7 @@ import org.apache.doris.analysis.ResumeSyncJobStmt;
 import org.apache.doris.analysis.StopSyncJobStmt;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.jmockit.Deencapsulation;
@@ -446,5 +447,40 @@ public class SyncJobManagerTest {
         Assert.assertEquals(0, dbIdToJobNameToSyncJobs.size());
     }
 
+    @Test
+    public void testCleanOverLimitJobs() {
+        SyncJob canalSyncJob = new CanalSyncJob(jobId, jobName, dbId);
+        // change sync job state to cancelled
+        try {
+            canalSyncJob.updateState(JobState.CANCELLED, false);
+        } catch (UserException e) {
+            Assert.fail();
+        }
+        Assert.assertEquals(JobState.CANCELLED, canalSyncJob.getJobState());
+
+        SyncJobManager manager = new SyncJobManager();
 
+        // add a sync job to manager
+        Map<Long, SyncJob> idToSyncJob = Maps.newHashMap();
+        idToSyncJob.put(jobId, canalSyncJob);
+        Map<Long, Map<String, List<SyncJob>>> dbIdToJobNameToSyncJobs = 
Maps.newHashMap();
+        Map<String, List<SyncJob>> jobNameToSyncJobs = Maps.newHashMap();
+        jobNameToSyncJobs.put(jobName, Lists.newArrayList(canalSyncJob));
+        dbIdToJobNameToSyncJobs.put(dbId, jobNameToSyncJobs);
+
+        Deencapsulation.setField(manager, "idToSyncJob", idToSyncJob);
+        Deencapsulation.setField(manager, "dbIdToJobNameToSyncJobs", 
dbIdToJobNameToSyncJobs);
+
+        new Expectations(canalSyncJob) {
+            {
+                canalSyncJob.isCompleted();
+                result = true;
+            }
+        };
+        Config.label_num_threshold = 0;
+        manager.cleanOverLimitSyncJobs();
+
+        Assert.assertEquals(0, idToSyncJob.size());
+        Assert.assertEquals(0, dbIdToJobNameToSyncJobs.size());
+    }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
index b988650fa2f..9108570e5e4 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
@@ -240,7 +240,17 @@ public class DatabaseTransactionMgrTest {
         DatabaseTransactionMgr masterDbTransMgr = 
masterTransMgr.getDatabaseTransactionMgr(CatalogTestUtil.testDbId1);
         Config.label_keep_max_second = -1;
         long currentMillis = System.currentTimeMillis();
-        masterDbTransMgr.removeExpiredTxns(currentMillis);
+        masterDbTransMgr.removeUselessTxns(currentMillis);
+        Assert.assertEquals(0, masterDbTransMgr.getFinishedTxnNums());
+        Assert.assertEquals(3, masterDbTransMgr.getTransactionNum());
+        
Assert.assertNull(masterDbTransMgr.unprotectedGetTxnIdsByLabel(CatalogTestUtil.testTxnLabel1));
+    }
+
+    @Test
+    public void testRemoveOverLimitTxns() throws AnalysisException {
+        DatabaseTransactionMgr masterDbTransMgr = 
masterTransMgr.getDatabaseTransactionMgr(CatalogTestUtil.testDbId1);
+        Config.label_num_threshold = 0;
+        masterDbTransMgr.removeUselessTxns(System.currentTimeMillis());
         Assert.assertEquals(0, masterDbTransMgr.getFinishedTxnNums());
         Assert.assertEquals(3, masterDbTransMgr.getTransactionNum());
         
Assert.assertNull(masterDbTransMgr.unprotectedGetTxnIdsByLabel(CatalogTestUtil.testTxnLabel1));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to