This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 0034c63b86b branch-2.1: [fix](routine load) fix incorrect auto-resume
interval caused by excessive auto-resume attempts #47528 (#47811)
0034c63b86b is described below
commit 0034c63b86bfcb0b8040cdac947a10d410922989
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Feb 14 20:36:57 2025 +0800
branch-2.1: [fix](routine load) fix incorrect auto-resume interval caused
by excessive auto-resume attempts #47528 (#47811)
Cherry-picked from #47528
Co-authored-by: hui lai <[email protected]>
---
.../doris/load/routineload/ScheduleRule.java | 14 +++++++---
.../load/routineload/RoutineLoadManagerTest.java | 30 ++++++++++++++++++++++
2 files changed, 40 insertions(+), 4 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/ScheduleRule.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/ScheduleRule.java
index dfc47aa496e..8454bba7303 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/ScheduleRule.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/ScheduleRule.java
@@ -76,16 +76,16 @@ public class ScheduleRule {
} else {
long current = System.currentTimeMillis();
if (current - jobRoutine.latestResumeTimestamp <
Config.period_of_auto_resume_min * 60000L) {
- long autoResumeIntervalTimeSec =
- Math.min((long) Math.pow(2,
jobRoutine.autoResumeCount) * BACK_OFF_BASIC_TIME_SEC,
- MAX_BACK_OFF_TIME_SEC);
+ long autoResumeIntervalTimeSec =
calAutoResumeInterval(jobRoutine);
if (current - jobRoutine.latestResumeTimestamp >
autoResumeIntervalTimeSec * 1000L) {
LOG.info("try to auto reschedule routine load {},
latestResumeTimestamp: {},"
+ " autoResumeCount: {}, pause reason: {}",
jobRoutine.id,
jobRoutine.latestResumeTimestamp, jobRoutine.autoResumeCount,
jobRoutine.pauseReason == null ? "null" :
jobRoutine.pauseReason.getCode().name());
jobRoutine.latestResumeTimestamp =
System.currentTimeMillis();
- jobRoutine.autoResumeCount++;
+ if (jobRoutine.autoResumeCount < Long.MAX_VALUE) {
+ jobRoutine.autoResumeCount++;
+ }
return true;
}
} else {
@@ -104,4 +104,10 @@ public class ScheduleRule {
}
return false;
}
+
+ public static long calAutoResumeInterval(RoutineLoadJob jobRoutine) {
+ return jobRoutine.autoResumeCount < 5
+ ? Math.min((long) Math.pow(2, jobRoutine.autoResumeCount)
* BACK_OFF_BASIC_TIME_SEC,
+ MAX_BACK_OFF_TIME_SEC) : MAX_BACK_OFF_TIME_SEC;
+ }
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
index 0eeb22ba2cb..d1a47558ef1 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
@@ -63,6 +63,7 @@ import org.apache.logging.log4j.Logger;
import org.junit.Assert;
import org.junit.Test;
+import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -1073,4 +1074,33 @@ public class RoutineLoadManagerTest {
Assert.assertEquals(RoutineLoadJob.JobState.NEED_SCHEDULE,
routineLoadJob1.getState());
Assert.assertEquals(RoutineLoadJob.JobState.NEED_SCHEDULE,
routineLoadJob2.getState());
}
+
+ @Test
+ public void testCalAutoResumeInterval() throws Exception {
+ RoutineLoadJob jobRoutine = new KafkaRoutineLoadJob();
+
+ Field maxBackOffField =
ScheduleRule.class.getDeclaredField("MAX_BACK_OFF_TIME_SEC");
+ maxBackOffField.setAccessible(true);
+ long maxBackOffTimeSec = (long) maxBackOffField.get(null);
+
+ Field backOffBasicField =
ScheduleRule.class.getDeclaredField("BACK_OFF_BASIC_TIME_SEC");
+ backOffBasicField.setAccessible(true);
+ long backOffTimeSec = (long) backOffBasicField.get(null);
+
+ jobRoutine.autoResumeCount = 0;
+ long interval = ScheduleRule.calAutoResumeInterval(jobRoutine);
+ Assert.assertEquals(Math.min((long) Math.pow(2, 0) * backOffTimeSec,
maxBackOffTimeSec), interval);
+
+ jobRoutine.autoResumeCount = 1;
+ interval = ScheduleRule.calAutoResumeInterval(jobRoutine);
+ Assert.assertEquals(Math.min((long) Math.pow(2, 1) * backOffTimeSec,
maxBackOffTimeSec), interval);
+
+ jobRoutine.autoResumeCount = 5;
+ interval = ScheduleRule.calAutoResumeInterval(jobRoutine);
+ Assert.assertEquals(maxBackOffTimeSec, interval);
+
+ jobRoutine.autoResumeCount = 1000;
+ interval = ScheduleRule.calAutoResumeInterval(jobRoutine);
+ Assert.assertEquals(maxBackOffTimeSec, interval);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]