This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 0034c63b86b branch-2.1: [fix](routine load) fix incorrect auto-resume 
interval caused by excessive auto-resume attempts #47528 (#47811)
0034c63b86b is described below

commit 0034c63b86bfcb0b8040cdac947a10d410922989
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Feb 14 20:36:57 2025 +0800

    branch-2.1: [fix](routine load) fix incorrect auto-resume interval caused 
by excessive auto-resume attempts #47528 (#47811)
    
    Cherry-picked from #47528
    
    Co-authored-by: hui lai <lai...@selectdb.com>
---
 .../doris/load/routineload/ScheduleRule.java       | 14 +++++++---
 .../load/routineload/RoutineLoadManagerTest.java   | 30 ++++++++++++++++++++++
 2 files changed, 40 insertions(+), 4 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/ScheduleRule.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/ScheduleRule.java
index dfc47aa496e..8454bba7303 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/ScheduleRule.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/ScheduleRule.java
@@ -76,16 +76,16 @@ public class ScheduleRule {
             } else {
                 long current = System.currentTimeMillis();
                 if (current - jobRoutine.latestResumeTimestamp < 
Config.period_of_auto_resume_min * 60000L) {
-                    long autoResumeIntervalTimeSec =
-                            Math.min((long) Math.pow(2, 
jobRoutine.autoResumeCount) * BACK_OFF_BASIC_TIME_SEC,
-                                    MAX_BACK_OFF_TIME_SEC);
+                    long autoResumeIntervalTimeSec = 
calAutoResumeInterval(jobRoutine);
                     if (current - jobRoutine.latestResumeTimestamp > 
autoResumeIntervalTimeSec * 1000L) {
                         LOG.info("try to auto reschedule routine load {}, 
latestResumeTimestamp: {},"
                                 + "  autoResumeCount: {}, pause reason: {}",
                                 jobRoutine.id, 
jobRoutine.latestResumeTimestamp, jobRoutine.autoResumeCount,
                                 jobRoutine.pauseReason == null ? "null" : 
jobRoutine.pauseReason.getCode().name());
                         jobRoutine.latestResumeTimestamp = 
System.currentTimeMillis();
-                        jobRoutine.autoResumeCount++;
+                        if (jobRoutine.autoResumeCount < Long.MAX_VALUE) {
+                            jobRoutine.autoResumeCount++;
+                        }
                         return true;
                     }
                 } else {
@@ -104,4 +104,10 @@ public class ScheduleRule {
         }
         return false;
     }
+
+    public static long calAutoResumeInterval(RoutineLoadJob jobRoutine) {
+        return jobRoutine.autoResumeCount < 5
+                    ? Math.min((long) Math.pow(2, jobRoutine.autoResumeCount) 
* BACK_OFF_BASIC_TIME_SEC,
+                            MAX_BACK_OFF_TIME_SEC) : MAX_BACK_OFF_TIME_SEC;
+    }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
index 0eeb22ba2cb..d1a47558ef1 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
@@ -63,6 +63,7 @@ import org.apache.logging.log4j.Logger;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -1073,4 +1074,33 @@ public class RoutineLoadManagerTest {
         Assert.assertEquals(RoutineLoadJob.JobState.NEED_SCHEDULE, 
routineLoadJob1.getState());
         Assert.assertEquals(RoutineLoadJob.JobState.NEED_SCHEDULE, 
routineLoadJob2.getState());
     }
+
+    @Test
+    public void testCalAutoResumeInterval() throws Exception {
+        RoutineLoadJob jobRoutine = new KafkaRoutineLoadJob();
+
+        Field maxBackOffField = 
ScheduleRule.class.getDeclaredField("MAX_BACK_OFF_TIME_SEC");
+        maxBackOffField.setAccessible(true);
+        long maxBackOffTimeSec = (long) maxBackOffField.get(null);
+
+        Field backOffBasicField = 
ScheduleRule.class.getDeclaredField("BACK_OFF_BASIC_TIME_SEC");
+        backOffBasicField.setAccessible(true);
+        long backOffTimeSec = (long) backOffBasicField.get(null);
+
+        jobRoutine.autoResumeCount = 0;
+        long interval = ScheduleRule.calAutoResumeInterval(jobRoutine);
+        Assert.assertEquals(Math.min((long) Math.pow(2, 0) * backOffTimeSec, 
maxBackOffTimeSec), interval);
+
+        jobRoutine.autoResumeCount = 1;
+        interval = ScheduleRule.calAutoResumeInterval(jobRoutine);
+        Assert.assertEquals(Math.min((long) Math.pow(2, 1) * backOffTimeSec, 
maxBackOffTimeSec), interval);
+
+        jobRoutine.autoResumeCount = 5;
+        interval = ScheduleRule.calAutoResumeInterval(jobRoutine);
+        Assert.assertEquals(maxBackOffTimeSec, interval);
+
+        jobRoutine.autoResumeCount = 1000;
+        interval = ScheduleRule.calAutoResumeInterval(jobRoutine);
+        Assert.assertEquals(maxBackOffTimeSec, interval);
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to