This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 0034c63b86b branch-2.1: [fix](routine load) fix incorrect auto-resume interval caused by excessive auto-resume attempts #47528 (#47811) 0034c63b86b is described below commit 0034c63b86bfcb0b8040cdac947a10d410922989 Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Fri Feb 14 20:36:57 2025 +0800 branch-2.1: [fix](routine load) fix incorrect auto-resume interval caused by excessive auto-resume attempts #47528 (#47811) Cherry-picked from #47528 Co-authored-by: hui lai <lai...@selectdb.com> --- .../doris/load/routineload/ScheduleRule.java | 14 +++++++--- .../load/routineload/RoutineLoadManagerTest.java | 30 ++++++++++++++++++++++ 2 files changed, 40 insertions(+), 4 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/ScheduleRule.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/ScheduleRule.java index dfc47aa496e..8454bba7303 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/ScheduleRule.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/ScheduleRule.java @@ -76,16 +76,16 @@ public class ScheduleRule { } else { long current = System.currentTimeMillis(); if (current - jobRoutine.latestResumeTimestamp < Config.period_of_auto_resume_min * 60000L) { - long autoResumeIntervalTimeSec = - Math.min((long) Math.pow(2, jobRoutine.autoResumeCount) * BACK_OFF_BASIC_TIME_SEC, - MAX_BACK_OFF_TIME_SEC); + long autoResumeIntervalTimeSec = calAutoResumeInterval(jobRoutine); if (current - jobRoutine.latestResumeTimestamp > autoResumeIntervalTimeSec * 1000L) { LOG.info("try to auto reschedule routine load {}, latestResumeTimestamp: {}," + " autoResumeCount: {}, pause reason: {}", jobRoutine.id, jobRoutine.latestResumeTimestamp, jobRoutine.autoResumeCount, jobRoutine.pauseReason == null ? "null" : jobRoutine.pauseReason.getCode().name()); jobRoutine.latestResumeTimestamp = System.currentTimeMillis(); - jobRoutine.autoResumeCount++; + if (jobRoutine.autoResumeCount < Long.MAX_VALUE) { + jobRoutine.autoResumeCount++; + } return true; } } else { @@ -104,4 +104,10 @@ public class ScheduleRule { } return false; } + + public static long calAutoResumeInterval(RoutineLoadJob jobRoutine) { + return jobRoutine.autoResumeCount < 5 + ? Math.min((long) Math.pow(2, jobRoutine.autoResumeCount) * BACK_OFF_BASIC_TIME_SEC, + MAX_BACK_OFF_TIME_SEC) : MAX_BACK_OFF_TIME_SEC; + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java index 0eeb22ba2cb..d1a47558ef1 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java @@ -63,6 +63,7 @@ import org.apache.logging.log4j.Logger; import org.junit.Assert; import org.junit.Test; +import java.lang.reflect.Field; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -1073,4 +1074,33 @@ public class RoutineLoadManagerTest { Assert.assertEquals(RoutineLoadJob.JobState.NEED_SCHEDULE, routineLoadJob1.getState()); Assert.assertEquals(RoutineLoadJob.JobState.NEED_SCHEDULE, routineLoadJob2.getState()); } + + @Test + public void testCalAutoResumeInterval() throws Exception { + RoutineLoadJob jobRoutine = new KafkaRoutineLoadJob(); + + Field maxBackOffField = ScheduleRule.class.getDeclaredField("MAX_BACK_OFF_TIME_SEC"); + maxBackOffField.setAccessible(true); + long maxBackOffTimeSec = (long) maxBackOffField.get(null); + + Field backOffBasicField = ScheduleRule.class.getDeclaredField("BACK_OFF_BASIC_TIME_SEC"); + backOffBasicField.setAccessible(true); + long backOffTimeSec = (long) backOffBasicField.get(null); + + jobRoutine.autoResumeCount = 0; + long interval = ScheduleRule.calAutoResumeInterval(jobRoutine); + Assert.assertEquals(Math.min((long) Math.pow(2, 0) * backOffTimeSec, maxBackOffTimeSec), interval); + + jobRoutine.autoResumeCount = 1; + interval = ScheduleRule.calAutoResumeInterval(jobRoutine); + Assert.assertEquals(Math.min((long) Math.pow(2, 1) * backOffTimeSec, maxBackOffTimeSec), interval); + + jobRoutine.autoResumeCount = 5; + interval = ScheduleRule.calAutoResumeInterval(jobRoutine); + Assert.assertEquals(maxBackOffTimeSec, interval); + + jobRoutine.autoResumeCount = 1000; + interval = ScheduleRule.calAutoResumeInterval(jobRoutine); + Assert.assertEquals(maxBackOffTimeSec, interval); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org