This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 7e027f3d1f2 [enhancement](fe-memory) support label num threshold to reduce fe memory consumption (#22889) 7e027f3d1f2 is described below commit 7e027f3d1f26ccd1436f9ec96135510200f4ed08 Author: Siyang Tang <82279870+tangsiyang2...@users.noreply.github.com> AuthorDate: Mon Jan 15 10:30:58 2024 +0800 [enhancement](fe-memory) support label num threshold to reduce fe memory consumption (#22889) Co-authored-by: Yongqiang YANG <98214048+dataroar...@users.noreply.github.com> --- .../main/java/org/apache/doris/common/Config.java | 8 +++ .../main/java/org/apache/doris/catalog/Env.java | 1 + .../org/apache/doris/load/loadv2/LoadManager.java | 73 ++++++++++++++++------ .../doris/load/routineload/RoutineLoadJob.java | 2 +- .../doris/load/routineload/RoutineLoadManager.java | 50 +++++++++++++-- .../load/routineload/RoutineLoadScheduler.java | 17 ++--- .../org/apache/doris/load/sync/SyncChecker.java | 1 + .../org/apache/doris/load/sync/SyncJobManager.java | 67 ++++++++++++++++---- .../doris/transaction/DatabaseTransactionMgr.java | 27 ++++++-- .../apache/doris/load/loadv2/LoadManagerTest.java | 46 +++++++++++++- .../load/routineload/RoutineLoadManagerTest.java | 43 ++++++++++++- .../apache/doris/load/sync/SyncJobManagerTest.java | 36 +++++++++++ .../transaction/DatabaseTransactionMgrTest.java | 12 +++- 13 files changed, 331 insertions(+), 52 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 5a3685a2c5e..4d38f041fe5 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2415,6 +2415,14 @@ public class Config extends ConfigBase { }) public static int http_load_submitter_max_worker_threads = 2; + @ConfField(mutable = true, masterOnly = true, description = { + "load label个数阈值,超过该个数后,对于已经完成导入作业或者任务,其label会被删除,被删除的 label 可以被重用。", + "The threshold of load labels' number. After this number is exceeded, " + + "the labels of the completed import jobs or tasks will be deleted, " + + "and the deleted labels can be reused." + }) + public static int label_num_threshold = 2000; + //========================================================================== // begin of cloud config //========================================================================== diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 9e371aadc0c..cc025656976 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -2452,6 +2452,7 @@ public class Env { loadManager.removeOldLoadJob(); exportMgr.removeOldExportJobs(); deleteHandler.removeOldDeleteInfos(); + loadManager.removeOverLimitLoadJob(); } }; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index 35968139ec8..1840494dcb2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -65,8 +65,11 @@ import java.io.DataOutput; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; +import java.util.Comparator; +import java.util.Deque; import java.util.EnumSet; import java.util.HashMap; import java.util.Iterator; @@ -77,6 +80,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Predicate; import java.util.stream.Collectors; /** @@ -407,29 +411,62 @@ public class LoadManager implements Writable { **/ public void removeOldLoadJob() { long currentTimeMs = System.currentTimeMillis(); + removeLoadJobIf(job -> job.isExpired(currentTimeMs)); + } + /** + * Remove completed jobs if total job num exceed Config.label_num_threshold + */ + public void removeOverLimitLoadJob() { + if (Config.label_num_threshold < 0 || idToLoadJob.size() <= Config.label_num_threshold) { + return; + } + writeLock(); + try { + Deque<LoadJob> finishedJobs = idToLoadJob + .values() + .stream() + .filter(LoadJob::isCompleted) + .sorted(Comparator.comparingLong(o -> o.finishTimestamp)) + .collect(Collectors.toCollection(ArrayDeque::new)); + while (!finishedJobs.isEmpty() + && idToLoadJob.size() > Config.label_num_threshold) { + LoadJob loadJob = finishedJobs.pollFirst(); + idToLoadJob.remove(loadJob.getId()); + jobRemovedTrigger(loadJob); + } + } finally { + writeUnlock(); + } + } + + private void jobRemovedTrigger(LoadJob job) { + Map<String, List<LoadJob>> map = dbIdToLabelToLoadJobs.get(job.getDbId()); + List<LoadJob> list = map.get(job.getLabel()); + list.remove(job); + if (job instanceof SparkLoadJob) { + ((SparkLoadJob) job).clearSparkLauncherLog(); + } + if (job instanceof BulkLoadJob) { + ((BulkLoadJob) job).recycleProgress(); + } + if (list.isEmpty()) { + map.remove(job.getLabel()); + } + if (map.isEmpty()) { + dbIdToLabelToLoadJobs.remove(job.getDbId()); + } + } + + private void removeLoadJobIf(Predicate<LoadJob> pred) { writeLock(); try { Iterator<Map.Entry<Long, LoadJob>> iter = idToLoadJob.entrySet().iterator(); while (iter.hasNext()) { LoadJob job = iter.next().getValue(); - if (job.isExpired(currentTimeMs)) { + if (pred.test(job)) { iter.remove(); - Map<String, List<LoadJob>> map = dbIdToLabelToLoadJobs.get(job.getDbId()); - List<LoadJob> list = map.get(job.getLabel()); - list.remove(job); - if (job instanceof SparkLoadJob) { - ((SparkLoadJob) job).clearSparkLauncherLog(); - } - if (job instanceof BulkLoadJob) { - ((BulkLoadJob) job).recycleProgress(); - } - if (list.isEmpty()) { - map.remove(job.getLabel()); - } - if (map.isEmpty()) { - dbIdToLabelToLoadJobs.remove(job.getDbId()); - } + jobRemovedTrigger(job); } } } finally { @@ -516,8 +553,8 @@ public class LoadManager implements Writable { * @param accurateMatch true: filter jobs which's label is labelValue. false: filter jobs which's label like itself. * @param statesValue used to filter jobs which's state within the statesValue set. * @return The result is the list of jobInfo. - * JobInfo is a list which includes the comparable object: jobId, label, state etc. - * The result is unordered. + * JobInfo is a list which includes the comparable object: jobId, label, state etc. + * The result is unordered. */ public List<List<Comparable>> getLoadJobInfosByDb(long dbId, String labelValue, boolean accurateMatch, Set<String> statesValue) throws AnalysisException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 1ce3c1e8c0c..0a947701ef5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -1670,7 +1670,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl abstract Map<String, String> getCustomProperties(); - public boolean needRemove() { + public boolean isExpired() { if (!isFinal()) { return false; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java index 5fb400d9d83..dde2e31bb9a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -62,7 +62,10 @@ import org.apache.logging.log4j.Logger; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.Comparator; +import java.util.Deque; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -70,6 +73,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Predicate; import java.util.stream.Collectors; public class RoutineLoadManager implements Writable { @@ -251,7 +255,7 @@ public class RoutineLoadManager implements Writable { if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(), dbFullName, PrivPredicate.LOAD)) { - //todo add new error code + // todo add new error code ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD", ConnectContext.get().getQualifiedUser(), ConnectContext.get().getRemoteIP(), @@ -579,7 +583,7 @@ public class RoutineLoadManager implements Writable { else return all of result */ public List<RoutineLoadJob> getJob(String dbFullName, String jobName, - boolean includeHistory, PatternMatcher matcher) + boolean includeHistory, PatternMatcher matcher) throws MetaNotFoundException { Preconditions.checkArgument(jobName == null || matcher == null, "jobName and matcher cannot be not null at the same time"); @@ -683,19 +687,55 @@ public class RoutineLoadManager implements Writable { // Remove old routine load jobs from idToRoutineLoadJob // This function is called periodically. - // Cancelled and stopped job will be remove after Configure.label_keep_max_second seconds + // Cancelled and stopped job will be removed after Configure.label_keep_max_second seconds public void cleanOldRoutineLoadJobs() { LOG.debug("begin to clean old routine load jobs "); + clearRoutineLoadJobIf(RoutineLoadJob::isExpired); + } + + /** + * Remove finished routine load jobs from idToRoutineLoadJob + * This function is called periodically if Config.label_num_threshold is set. + * Cancelled and stopped job will be removed. + */ + public void cleanOverLimitRoutineLoadJobs() { + if (Config.label_num_threshold < 0 + || idToRoutineLoadJob.size() <= Config.label_num_threshold) { + return; + } + writeLock(); + try { + LOG.debug("begin to clean routine load jobs"); + Deque<RoutineLoadJob> finishedJobs = idToRoutineLoadJob + .values() + .stream() + .filter(RoutineLoadJob::isFinal) + .sorted(Comparator.comparingLong(o -> o.endTimestamp)) + .collect(Collectors.toCollection(ArrayDeque::new)); + while (!finishedJobs.isEmpty() + && idToRoutineLoadJob.size() > Config.label_num_threshold) { + RoutineLoadJob routineLoadJob = finishedJobs.pollFirst(); + unprotectedRemoveJobFromDb(routineLoadJob); + idToRoutineLoadJob.remove(routineLoadJob.getId()); + RoutineLoadOperation operation = new RoutineLoadOperation(routineLoadJob.getId(), + routineLoadJob.getState()); + Env.getCurrentEnv().getEditLog().logRemoveRoutineLoadJob(operation); + } + } finally { + writeUnlock(); + } + } + + private void clearRoutineLoadJobIf(Predicate<RoutineLoadJob> pred) { writeLock(); try { Iterator<Map.Entry<Long, RoutineLoadJob>> iterator = idToRoutineLoadJob.entrySet().iterator(); long currentTimestamp = System.currentTimeMillis(); while (iterator.hasNext()) { RoutineLoadJob routineLoadJob = iterator.next().getValue(); - if (routineLoadJob.needRemove()) { + if (pred.test(routineLoadJob)) { unprotectedRemoveJobFromDb(routineLoadJob); iterator.remove(); - RoutineLoadOperation operation = new RoutineLoadOperation(routineLoadJob.getId(), routineLoadJob.getState()); Env.getCurrentEnv().getEditLog().logRemoveRoutineLoadJob(operation); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java index b3b2c8baf2f..84f9548de13 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java @@ -83,9 +83,9 @@ public class RoutineLoadScheduler extends MasterDaemon { if (desiredConcurrentTaskNum <= 0) { // the job will be rescheduled later. LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId()) - .add("msg", "the current concurrent num is less than or equal to zero, " - + "job will be rescheduled later") - .build()); + .add("msg", "the current concurrent num is less than or equal to zero, " + + "job will be rescheduled later") + .build()); continue; } // check state and divide job into tasks @@ -112,10 +112,10 @@ public class RoutineLoadScheduler extends MasterDaemon { routineLoadJob.updateState(errorJobState, reason, false); } catch (UserException e) { LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId()) - .add("current_state", routineLoadJob.getState()) - .add("desired_state", errorJobState) - .add("warn_msg", "failed to change state to desired state") - .build(), e); + .add("current_state", routineLoadJob.getState()) + .add("desired_state", errorJobState) + .add("warn_msg", "failed to change state to desired state") + .build(), e); } } } @@ -124,11 +124,12 @@ public class RoutineLoadScheduler extends MasterDaemon { routineLoadManager.processTimeoutTasks(); routineLoadManager.cleanOldRoutineLoadJobs(); + + routineLoadManager.cleanOverLimitRoutineLoadJobs(); } private List<RoutineLoadJob> getNeedScheduleRoutineJobs() throws LoadException { return routineLoadManager.getRoutineLoadJobByState(Sets.newHashSet(RoutineLoadJob.JobState.NEED_SCHEDULE)); } - } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChecker.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChecker.java index c0fa55ac6ec..319589c9aa1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChecker.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChecker.java @@ -79,5 +79,6 @@ public class SyncChecker extends MasterDaemon { private void cleanOldSyncJobs() { // clean up expired sync jobs this.syncJobManager.cleanOldSyncJobs(); + this.syncJobManager.cleanOverLimitSyncJobs(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJobManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJobManager.java index c1533843ef9..5be922d6e2c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJobManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJobManager.java @@ -23,6 +23,7 @@ import org.apache.doris.analysis.ResumeSyncJobStmt; import org.apache.doris.analysis.StopSyncJobStmt; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; +import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.UserException; import org.apache.doris.common.io.Writable; @@ -39,12 +40,17 @@ import org.apache.logging.log4j.Logger; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.ArrayDeque; import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.Deque; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Predicate; import java.util.stream.Collectors; public class SyncJobManager implements Writable { @@ -58,7 +64,7 @@ public class SyncJobManager implements Writable { public SyncJobManager() { idToSyncJob = Maps.newConcurrentMap(); - dbIdToJobNameToSyncJobs = Maps.newConcurrentMap(); + dbIdToJobNameToSyncJobs = Collections.synchronizedMap(Maps.newLinkedHashMap()); lock = new ReentrantReadWriteLock(true); } @@ -283,31 +289,68 @@ public class SyncJobManager implements Writable { // Stopped jobs will be removed after Config.label_keep_max_second. public void cleanOldSyncJobs() { LOG.debug("begin to clean old sync jobs "); + cleanFinishedSyncJobsIf(job -> job.isExpired(System.currentTimeMillis())); + } + + /** + * Remove completed jobs if total job num exceed Config.label_num_threshold + */ + public void cleanOverLimitSyncJobs() { + if (Config.label_num_threshold < 0 || idToSyncJob.size() <= Config.label_num_threshold) { + return; + } + writeLock(); + try { + LOG.debug("begin to clean finished sync jobs "); + Deque<SyncJob> finishedJobs = idToSyncJob + .values() + .stream() + .filter(SyncJob::isCompleted) + .sorted(Comparator.comparingLong(o -> o.finishTimeMs)) + .collect(Collectors.toCollection(ArrayDeque::new)); + while (!finishedJobs.isEmpty() && idToSyncJob.size() > Config.label_num_threshold) { + SyncJob syncJob = finishedJobs.pollFirst(); + if (!dbIdToJobNameToSyncJobs.containsKey(syncJob.getDbId())) { + continue; + } + idToSyncJob.remove(syncJob.getId()); + jobRemovedTrigger(syncJob); + } + } finally { + writeUnlock(); + } + } + + private void jobRemovedTrigger(SyncJob syncJob) { + Map<String, List<SyncJob>> map = dbIdToJobNameToSyncJobs.get(syncJob.getDbId()); + List<SyncJob> list = map.get(syncJob.getJobName()); + list.remove(syncJob); + if (list.isEmpty()) { + map.remove(syncJob.getJobName()); + } + if (map.isEmpty()) { + dbIdToJobNameToSyncJobs.remove(syncJob.getDbId()); + } + } + + public void cleanFinishedSyncJobsIf(Predicate<SyncJob> pred) { long currentTimeMs = System.currentTimeMillis(); writeLock(); try { Iterator<Map.Entry<Long, SyncJob>> iterator = idToSyncJob.entrySet().iterator(); while (iterator.hasNext()) { SyncJob syncJob = iterator.next().getValue(); - if (syncJob.isExpired(currentTimeMs)) { + if (pred.test(syncJob)) { if (!dbIdToJobNameToSyncJobs.containsKey(syncJob.getDbId())) { continue; } - Map<String, List<SyncJob>> map = dbIdToJobNameToSyncJobs.get(syncJob.getDbId()); - List<SyncJob> list = map.get(syncJob.getJobName()); - list.remove(syncJob); - if (list.isEmpty()) { - map.remove(syncJob.getJobName()); - } - if (map.isEmpty()) { - dbIdToJobNameToSyncJobs.remove(syncJob.getDbId()); - } + jobRemovedTrigger(syncJob); iterator.remove(); LOG.info(new LogBuilder(LogKey.SYNC_JOB, syncJob.getId()) .add("finishTimeMs", syncJob.getFinishTimeMs()) .add("currentTimeMs", currentTimeMs) .add("jobState", syncJob.getJobState()) - .add("msg", "old sync job has been cleaned") + .add("msg", "sync job has been cleaned") ); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index 57088cbd7a8..565df045ccc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -168,6 +168,8 @@ public class DatabaseTransactionMgr { private long lockReportingThresholdMs = Config.lock_reporting_threshold_ms; + private long maxFinalTxnsNum = Long.MAX_VALUE; + private void readLock() { this.transactionLock.readLock().lock(); } @@ -191,6 +193,9 @@ public class DatabaseTransactionMgr { this.env = env; this.idGenerator = idGenerator; this.editLog = env.getEditLog(); + if (Config.label_num_threshold >= 0) { + this.maxFinalTxnsNum = Config.label_num_threshold; + } } public long getDbId() { @@ -1622,13 +1627,13 @@ public class DatabaseTransactionMgr { return partitionInfos; } - public void removeExpiredTxns(long currentMillis) { + public void removeUselessTxns(long currentMillis) { // delete expired txns writeLock(); try { - Pair<Long, Integer> expiredTxnsInfoForShort = unprotectedRemoveExpiredTxns(currentMillis, + Pair<Long, Integer> expiredTxnsInfoForShort = unprotectedRemoveUselessTxns(currentMillis, finalStatusTransactionStateDequeShort, MAX_REMOVE_TXN_PER_ROUND); - Pair<Long, Integer> expiredTxnsInfoForLong = unprotectedRemoveExpiredTxns(currentMillis, + Pair<Long, Integer> expiredTxnsInfoForLong = unprotectedRemoveUselessTxns(currentMillis, finalStatusTransactionStateDequeLong, MAX_REMOVE_TXN_PER_ROUND - expiredTxnsInfoForShort.second); int numOfClearedTransaction = expiredTxnsInfoForShort.second + expiredTxnsInfoForLong.second; @@ -1645,7 +1650,7 @@ public class DatabaseTransactionMgr { } } - private Pair<Long, Integer> unprotectedRemoveExpiredTxns(long currentMillis, + private Pair<Long, Integer> unprotectedRemoveUselessTxns(long currentMillis, ArrayDeque<TransactionState> finalStatusTransactionStateDeque, int left) { long latestTxnId = -1; int numOfClearedTransaction = 0; @@ -1660,6 +1665,18 @@ public class DatabaseTransactionMgr { break; } } + while (finalStatusTransactionStateDeque.size() > maxFinalTxnsNum + && numOfClearedTransaction < left) { + TransactionState transactionState = finalStatusTransactionStateDeque.getFirst(); + if (transactionState.getFinishTime() != -1) { + finalStatusTransactionStateDeque.pop(); + clearTransactionState(transactionState.getTransactionId()); + latestTxnId = transactionState.getTransactionId(); + numOfClearedTransaction++; + } else { + break; + } + } return Pair.of(latestTxnId, numOfClearedTransaction); } @@ -2031,7 +2048,7 @@ public class DatabaseTransactionMgr { } public void removeExpiredAndTimeoutTxns(long currentMillis) { - removeExpiredTxns(currentMillis); + removeUselessTxns(currentMillis); List<Long> timeoutTxns = getTimeoutTxns(currentMillis); // abort timeout txns for (Long txnId : timeoutTxns) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/LoadManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/LoadManagerTest.java index a30d25a944f..e9b3278cfd0 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/LoadManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/LoadManagerTest.java @@ -40,6 +40,7 @@ import java.io.DataOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; +import java.util.List; import java.util.Map; public class LoadManagerTest { @@ -123,7 +124,7 @@ public class LoadManagerTest { LoadJob job1 = new InsertLoadJob("job1", 1L, 1L, 1L, System.currentTimeMillis(), "", "", userInfo); Deencapsulation.invoke(loadManager, "addLoadJob", job1); - //make job1 don't serialize + // make job1 don't serialize Config.streaming_label_keep_max_second = 1; Thread.sleep(2000); @@ -135,6 +136,49 @@ public class LoadManagerTest { Assert.assertEquals(0, newLoadJobs.size()); } + @Test + public void testCleanOverLimitJobs(@Mocked Env env, + @Mocked InternalCatalog catalog, @Injectable Database database, @Injectable Table table) throws Exception { + new Expectations() { + { + env.getNextId(); + returns(1L, 2L); + env.getInternalCatalog(); + minTimes = 0; + result = catalog; + catalog.getDbNullable(anyLong); + minTimes = 0; + result = database; + database.getTableNullable(anyLong); + minTimes = 0; + result = table; + table.getName(); + minTimes = 0; + result = "tablename"; + Env.getCurrentEnvJournalVersion(); + minTimes = 0; + result = FeMetaVersion.VERSION_CURRENT; + } + }; + + loadManager = new LoadManager(new LoadJobScheduler()); + LoadJob job1 = new InsertLoadJob("job1", 1L, 1L, 1L, System.currentTimeMillis(), "", "", userInfo); + Thread.sleep(100); + LoadJob job2 = new InsertLoadJob("job2", 1L, 1L, 1L, System.currentTimeMillis(), "", "", userInfo); + Deencapsulation.invoke(loadManager, "addLoadJob", job2); + Deencapsulation.invoke(loadManager, "addLoadJob", job1); + Config.label_num_threshold = 1; + loadManager.removeOverLimitLoadJob(); + Map<Long, LoadJob> idToJobs = Deencapsulation.getField(loadManager, fieldName); + Map<Long, Map<String, List<LoadJob>>> dbIdToLabelToLoadJobs = Deencapsulation.getField(loadManager, + "dbIdToLabelToLoadJobs"); + Assert.assertEquals(1, idToJobs.size()); + Assert.assertEquals(1, dbIdToLabelToLoadJobs.size()); + LoadJob loadJob = idToJobs.get(job2.getId()); + Assert.assertEquals("job2", loadJob.getLabel()); + Assert.assertNotNull(dbIdToLabelToLoadJobs.get(1L).get("job2")); + } + private File serializeToFile(LoadManager loadManager) throws Exception { File file = new File("./loadManagerTest"); file.createNewFile(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java index c2f46ffa53c..156dae72234 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java @@ -801,7 +801,7 @@ public class RoutineLoadManagerTest { new Expectations() { { - routineLoadJob.needRemove(); + routineLoadJob.isExpired(); minTimes = 0; result = true; routineLoadJob.getDbId(); @@ -821,6 +821,47 @@ public class RoutineLoadManagerTest { Assert.assertEquals(0, idToRoutineLoadJob.size()); } + @Test + public void testCleanOverLimitRoutineLoadJobs(@Injectable RoutineLoadJob routineLoadJob, + @Mocked Env env, @Mocked EditLog editLog) { + RoutineLoadManager routineLoadManager = new RoutineLoadManager(); + Map<Long, Map<String, List<RoutineLoadJob>>> dbToNameToRoutineLoadJob = Maps.newHashMap(); + Map<String, List<RoutineLoadJob>> nameToRoutineLoadJob = Maps.newHashMap(); + List<RoutineLoadJob> routineLoadJobList = Lists.newArrayList(); + routineLoadJobList.add(routineLoadJob); + nameToRoutineLoadJob.put("", routineLoadJobList); + dbToNameToRoutineLoadJob.put(1L, nameToRoutineLoadJob); + Map<Long, RoutineLoadJob> idToRoutineLoadJob = Maps.newHashMap(); + idToRoutineLoadJob.put(1L, routineLoadJob); + Deencapsulation.setField(routineLoadManager, "idToRoutineLoadJob", idToRoutineLoadJob); + Deencapsulation.setField(routineLoadManager, "dbToNameToRoutineLoadJob", dbToNameToRoutineLoadJob); + + new Expectations() { + { + routineLoadJob.getId(); + minTimes = 0; + result = 1L; + routineLoadJob.isFinal(); + minTimes = 0; + result = true; + routineLoadJob.getDbId(); + minTimes = 0; + result = 1L; + routineLoadJob.getName(); + minTimes = 0; + result = ""; + env.getEditLog(); + minTimes = 0; + result = editLog; + } + }; + Config.label_num_threshold = 0; + + routineLoadManager.cleanOverLimitRoutineLoadJobs(); + Assert.assertEquals(0, dbToNameToRoutineLoadJob.size()); + Assert.assertEquals(0, idToRoutineLoadJob.size()); + } + @Test public void testGetBeIdConcurrentTaskMaps(@Injectable RoutineLoadJob routineLoadJob) { RoutineLoadManager routineLoadManager = new RoutineLoadManager(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/sync/SyncJobManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/sync/SyncJobManagerTest.java index d0badf19701..dc239dde597 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/sync/SyncJobManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/sync/SyncJobManagerTest.java @@ -23,6 +23,7 @@ import org.apache.doris.analysis.ResumeSyncJobStmt; import org.apache.doris.analysis.StopSyncJobStmt; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; +import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.UserException; import org.apache.doris.common.jmockit.Deencapsulation; @@ -446,5 +447,40 @@ public class SyncJobManagerTest { Assert.assertEquals(0, dbIdToJobNameToSyncJobs.size()); } + @Test + public void testCleanOverLimitJobs() { + SyncJob canalSyncJob = new CanalSyncJob(jobId, jobName, dbId); + // change sync job state to cancelled + try { + canalSyncJob.updateState(JobState.CANCELLED, false); + } catch (UserException e) { + Assert.fail(); + } + Assert.assertEquals(JobState.CANCELLED, canalSyncJob.getJobState()); + + SyncJobManager manager = new SyncJobManager(); + // add a sync job to manager + Map<Long, SyncJob> idToSyncJob = Maps.newHashMap(); + idToSyncJob.put(jobId, canalSyncJob); + Map<Long, Map<String, List<SyncJob>>> dbIdToJobNameToSyncJobs = Maps.newHashMap(); + Map<String, List<SyncJob>> jobNameToSyncJobs = Maps.newHashMap(); + jobNameToSyncJobs.put(jobName, Lists.newArrayList(canalSyncJob)); + dbIdToJobNameToSyncJobs.put(dbId, jobNameToSyncJobs); + + Deencapsulation.setField(manager, "idToSyncJob", idToSyncJob); + Deencapsulation.setField(manager, "dbIdToJobNameToSyncJobs", dbIdToJobNameToSyncJobs); + + new Expectations(canalSyncJob) { + { + canalSyncJob.isCompleted(); + result = true; + } + }; + Config.label_num_threshold = 0; + manager.cleanOverLimitSyncJobs(); + + Assert.assertEquals(0, idToSyncJob.size()); + Assert.assertEquals(0, dbIdToJobNameToSyncJobs.size()); + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java index b988650fa2f..9108570e5e4 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java @@ -240,7 +240,17 @@ public class DatabaseTransactionMgrTest { DatabaseTransactionMgr masterDbTransMgr = masterTransMgr.getDatabaseTransactionMgr(CatalogTestUtil.testDbId1); Config.label_keep_max_second = -1; long currentMillis = System.currentTimeMillis(); - masterDbTransMgr.removeExpiredTxns(currentMillis); + masterDbTransMgr.removeUselessTxns(currentMillis); + Assert.assertEquals(0, masterDbTransMgr.getFinishedTxnNums()); + Assert.assertEquals(3, masterDbTransMgr.getTransactionNum()); + Assert.assertNull(masterDbTransMgr.unprotectedGetTxnIdsByLabel(CatalogTestUtil.testTxnLabel1)); + } + + @Test + public void testRemoveOverLimitTxns() throws AnalysisException { + DatabaseTransactionMgr masterDbTransMgr = masterTransMgr.getDatabaseTransactionMgr(CatalogTestUtil.testDbId1); + Config.label_num_threshold = 0; + masterDbTransMgr.removeUselessTxns(System.currentTimeMillis()); Assert.assertEquals(0, masterDbTransMgr.getFinishedTxnNums()); Assert.assertEquals(3, masterDbTransMgr.getTransactionNum()); Assert.assertNull(masterDbTransMgr.unprotectedGetTxnIdsByLabel(CatalogTestUtil.testTxnLabel1)); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org