This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new d08a418dd85 [branch-2.1](routine-load) optimize routine load job auto resume policy (#37373) d08a418dd85 is described below commit d08a418dd85d6eb5f01aecd1f5aaa7568c781ef1 Author: hui lai <1353307...@qq.com> AuthorDate: Sun Jul 7 18:16:56 2024 +0800 [branch-2.1](routine-load) optimize routine load job auto resume policy (#37373) pick #35266 --- .../main/java/org/apache/doris/common/Config.java | 2 +- .../doris/load/routineload/RoutineLoadJob.java | 3 +- .../doris/load/routineload/RoutineLoadManager.java | 3 +- .../doris/load/routineload/ScheduleRule.java | 40 ++++++++++------------ .../load/routineload/RoutineLoadManagerTest.java | 11 +++--- 5 files changed, 27 insertions(+), 32 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 26ef8852b45..94d5725c38a 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1257,7 +1257,7 @@ public class Config extends ConfigBase { * a period for auto resume routine load */ @ConfField(mutable = true, masterOnly = true) - public static int period_of_auto_resume_min = 5; + public static int period_of_auto_resume_min = 10; /** * If set to true, the backend will be automatically dropped after finishing decommission. diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 8c8dd7eaad9..130bd87b018 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -224,9 +224,8 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl protected int currentTaskConcurrentNum; protected RoutineLoadProgress progress; - protected long firstResumeTimestamp; // the first resume time + protected long latestResumeTimestamp; // the latest resume time protected long autoResumeCount; - protected boolean autoResumeLock = false; //it can't auto resume iff true // some other msg which need to show to user; protected String otherMsg = ""; protected ErrorReason pauseReason; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java index 356262f8c2a..6121f34035c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -367,8 +367,7 @@ public class RoutineLoadManager implements Writable { try { routineLoadJob.jobStatistic.errorRowsAfterResumed = 0; routineLoadJob.autoResumeCount = 0; - routineLoadJob.firstResumeTimestamp = 0; - routineLoadJob.autoResumeLock = false; + routineLoadJob.latestResumeTimestamp = 0; routineLoadJob.updateState(RoutineLoadJob.JobState.NEED_SCHEDULE, null, false /* not replay */); LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId()) .add("current_state", routineLoadJob.getState()) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/ScheduleRule.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/ScheduleRule.java index 2b4ef7b297a..dfc47aa496e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/ScheduleRule.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/ScheduleRule.java @@ -31,6 +31,10 @@ import org.apache.logging.log4j.Logger; public class ScheduleRule { private static final Logger LOG = LogManager.getLogger(ScheduleRule.class); + private static final long BACK_OFF_BASIC_TIME_SEC = 10L; + + private static final long MAX_BACK_OFF_TIME_SEC = 60 * 5; + private static int deadBeCount() { SystemInfoService systemInfoService = Env.getCurrentSystemInfo(); int total = systemInfoService.getAllBackendIds(false).size(); @@ -47,22 +51,10 @@ public class ScheduleRule { if (jobRoutine.state != RoutineLoadJob.JobState.PAUSED) { return false; } - if (jobRoutine.autoResumeLock) { //only manual resume for unlock - if (LOG.isDebugEnabled()) { - LOG.debug("routine load job {}'s autoResumeLock is true, skip", jobRoutine.id); - } - return false; - } /* * Handle all backends are down. */ - if (LOG.isDebugEnabled()) { - LOG.debug("try to auto reschedule routine load {}, firstResumeTimestamp: {}, autoResumeCount: {}, " - + "pause reason: {}", - jobRoutine.id, jobRoutine.firstResumeTimestamp, jobRoutine.autoResumeCount, - jobRoutine.pauseReason == null ? "null" : jobRoutine.pauseReason.getCode().name()); - } if (jobRoutine.pauseReason != null && jobRoutine.pauseReason.getCode() != InternalErrorCode.MANUAL_PAUSE_ERR && jobRoutine.pauseReason.getCode() != InternalErrorCode.TOO_MANY_FAILURE_ROWS_ERR @@ -77,19 +69,25 @@ public class ScheduleRule { return false; } - if (jobRoutine.firstResumeTimestamp == 0) { //the first resume - jobRoutine.firstResumeTimestamp = System.currentTimeMillis(); + if (jobRoutine.latestResumeTimestamp == 0) { //the first resume + jobRoutine.latestResumeTimestamp = System.currentTimeMillis(); jobRoutine.autoResumeCount = 1; return true; } else { long current = System.currentTimeMillis(); - if (current - jobRoutine.firstResumeTimestamp < Config.period_of_auto_resume_min * 60000L) { - if (jobRoutine.autoResumeCount >= 3) { - jobRoutine.autoResumeLock = true; // locked Auto Resume RoutineLoadJob - return false; + if (current - jobRoutine.latestResumeTimestamp < Config.period_of_auto_resume_min * 60000L) { + long autoResumeIntervalTimeSec = + Math.min((long) Math.pow(2, jobRoutine.autoResumeCount) * BACK_OFF_BASIC_TIME_SEC, + MAX_BACK_OFF_TIME_SEC); + if (current - jobRoutine.latestResumeTimestamp > autoResumeIntervalTimeSec * 1000L) { + LOG.info("try to auto reschedule routine load {}, latestResumeTimestamp: {}," + + " autoResumeCount: {}, pause reason: {}", + jobRoutine.id, jobRoutine.latestResumeTimestamp, jobRoutine.autoResumeCount, + jobRoutine.pauseReason == null ? "null" : jobRoutine.pauseReason.getCode().name()); + jobRoutine.latestResumeTimestamp = System.currentTimeMillis(); + jobRoutine.autoResumeCount++; + return true; } - jobRoutine.autoResumeCount++; - return true; } else { /** * for example: @@ -98,7 +96,7 @@ public class ScheduleRule { * the third resume time at 10:20 * --> we must be reset counter because a new period for AutoResume RoutineLoadJob */ - jobRoutine.firstResumeTimestamp = current; + jobRoutine.latestResumeTimestamp = current; jobRoutine.autoResumeCount = 1; return true; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java index 737936191e8..0eeb22ba2cb 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java @@ -638,21 +638,20 @@ public class RoutineLoadManagerTest { Assert.assertEquals(RoutineLoadJob.JobState.PAUSED, routineLoadJob.getState()); - // 第一次自动恢复 for (int i = 0; i < 3; i++) { Deencapsulation.setField(routineLoadJob, "pauseReason", new ErrorReason(InternalErrorCode.REPLICA_FEW_ERR, "")); + try { + Thread.sleep(((long) Math.pow(2, i) * 10 * 1000L)); + } catch (InterruptedException e) { + throw new UserException("thread sleep failed"); + } routineLoadManager.updateRoutineLoadJob(); Assert.assertEquals(RoutineLoadJob.JobState.NEED_SCHEDULE, routineLoadJob.getState()); Deencapsulation.setField(routineLoadJob, "state", RoutineLoadJob.JobState.PAUSED); - boolean autoResumeLock = Deencapsulation.getField(routineLoadJob, "autoResumeLock"); - Assert.assertEquals(autoResumeLock, false); } - // 第四次自动恢复 就会锁定 routineLoadManager.updateRoutineLoadJob(); Assert.assertEquals(RoutineLoadJob.JobState.PAUSED, routineLoadJob.getState()); - boolean autoResumeLock = Deencapsulation.getField(routineLoadJob, "autoResumeLock"); - Assert.assertEquals(autoResumeLock, true); } @Test --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org