This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new d08a418dd85 [branch-2.1](routine-load) optimize routine load job auto 
resume policy (#37373)
d08a418dd85 is described below

commit d08a418dd85d6eb5f01aecd1f5aaa7568c781ef1
Author: hui lai <1353307...@qq.com>
AuthorDate: Sun Jul 7 18:16:56 2024 +0800

    [branch-2.1](routine-load) optimize routine load job auto resume policy 
(#37373)
    
    pick #35266
---
 .../main/java/org/apache/doris/common/Config.java  |  2 +-
 .../doris/load/routineload/RoutineLoadJob.java     |  3 +-
 .../doris/load/routineload/RoutineLoadManager.java |  3 +-
 .../doris/load/routineload/ScheduleRule.java       | 40 ++++++++++------------
 .../load/routineload/RoutineLoadManagerTest.java   | 11 +++---
 5 files changed, 27 insertions(+), 32 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 26ef8852b45..94d5725c38a 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1257,7 +1257,7 @@ public class Config extends ConfigBase {
      * a period for auto resume routine load
      */
     @ConfField(mutable = true, masterOnly = true)
-    public static int period_of_auto_resume_min = 5;
+    public static int period_of_auto_resume_min = 10;
 
     /**
      * If set to true, the backend will be automatically dropped after 
finishing decommission.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 8c8dd7eaad9..130bd87b018 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -224,9 +224,8 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
     protected int currentTaskConcurrentNum;
     protected RoutineLoadProgress progress;
 
-    protected long firstResumeTimestamp; // the first resume time
+    protected long latestResumeTimestamp; // the latest resume time
     protected long autoResumeCount;
-    protected boolean autoResumeLock = false; //it can't auto resume iff true
     // some other msg which need to show to user;
     protected String otherMsg = "";
     protected ErrorReason pauseReason;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
index 356262f8c2a..6121f34035c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
@@ -367,8 +367,7 @@ public class RoutineLoadManager implements Writable {
             try {
                 routineLoadJob.jobStatistic.errorRowsAfterResumed = 0;
                 routineLoadJob.autoResumeCount = 0;
-                routineLoadJob.firstResumeTimestamp = 0;
-                routineLoadJob.autoResumeLock = false;
+                routineLoadJob.latestResumeTimestamp = 0;
                 
routineLoadJob.updateState(RoutineLoadJob.JobState.NEED_SCHEDULE, null, false 
/* not replay */);
                 LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, 
routineLoadJob.getId())
                         .add("current_state", routineLoadJob.getState())
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/ScheduleRule.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/ScheduleRule.java
index 2b4ef7b297a..dfc47aa496e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/ScheduleRule.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/ScheduleRule.java
@@ -31,6 +31,10 @@ import org.apache.logging.log4j.Logger;
 public class ScheduleRule {
     private static final Logger LOG = LogManager.getLogger(ScheduleRule.class);
 
+    private static final long BACK_OFF_BASIC_TIME_SEC = 10L;
+
+    private static final long MAX_BACK_OFF_TIME_SEC = 60 * 5;
+
     private static int deadBeCount() {
         SystemInfoService systemInfoService = Env.getCurrentSystemInfo();
         int total = systemInfoService.getAllBackendIds(false).size();
@@ -47,22 +51,10 @@ public class ScheduleRule {
         if (jobRoutine.state != RoutineLoadJob.JobState.PAUSED) {
             return false;
         }
-        if (jobRoutine.autoResumeLock) { //only manual resume for unlock
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("routine load job {}'s autoResumeLock is true, 
skip", jobRoutine.id);
-            }
-            return false;
-        }
 
         /*
          * Handle all backends are down.
          */
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("try to auto reschedule routine load {}, 
firstResumeTimestamp: {}, autoResumeCount: {}, "
-                            + "pause reason: {}",
-                    jobRoutine.id, jobRoutine.firstResumeTimestamp, 
jobRoutine.autoResumeCount,
-                    jobRoutine.pauseReason == null ? "null" : 
jobRoutine.pauseReason.getCode().name());
-        }
         if (jobRoutine.pauseReason != null
                 && jobRoutine.pauseReason.getCode() != 
InternalErrorCode.MANUAL_PAUSE_ERR
                 && jobRoutine.pauseReason.getCode() != 
InternalErrorCode.TOO_MANY_FAILURE_ROWS_ERR
@@ -77,19 +69,25 @@ public class ScheduleRule {
                 return false;
             }
 
-            if (jobRoutine.firstResumeTimestamp == 0) { //the first resume
-                jobRoutine.firstResumeTimestamp = System.currentTimeMillis();
+            if (jobRoutine.latestResumeTimestamp == 0) { //the first resume
+                jobRoutine.latestResumeTimestamp = System.currentTimeMillis();
                 jobRoutine.autoResumeCount = 1;
                 return true;
             } else {
                 long current = System.currentTimeMillis();
-                if (current - jobRoutine.firstResumeTimestamp < 
Config.period_of_auto_resume_min * 60000L) {
-                    if (jobRoutine.autoResumeCount >= 3) {
-                        jobRoutine.autoResumeLock = true; // locked Auto 
Resume RoutineLoadJob
-                        return false;
+                if (current - jobRoutine.latestResumeTimestamp < 
Config.period_of_auto_resume_min * 60000L) {
+                    long autoResumeIntervalTimeSec =
+                            Math.min((long) Math.pow(2, 
jobRoutine.autoResumeCount) * BACK_OFF_BASIC_TIME_SEC,
+                                    MAX_BACK_OFF_TIME_SEC);
+                    if (current - jobRoutine.latestResumeTimestamp > 
autoResumeIntervalTimeSec * 1000L) {
+                        LOG.info("try to auto reschedule routine load {}, 
latestResumeTimestamp: {},"
+                                + "  autoResumeCount: {}, pause reason: {}",
+                                jobRoutine.id, 
jobRoutine.latestResumeTimestamp, jobRoutine.autoResumeCount,
+                                jobRoutine.pauseReason == null ? "null" : 
jobRoutine.pauseReason.getCode().name());
+                        jobRoutine.latestResumeTimestamp = 
System.currentTimeMillis();
+                        jobRoutine.autoResumeCount++;
+                        return true;
                     }
-                    jobRoutine.autoResumeCount++;
-                    return true;
                 } else {
                     /**
                      * for example:
@@ -98,7 +96,7 @@ public class ScheduleRule {
                      *       the third resume time at 10:20
                      *           --> we must be reset counter because a new 
period for AutoResume RoutineLoadJob
                      */
-                    jobRoutine.firstResumeTimestamp = current;
+                    jobRoutine.latestResumeTimestamp = current;
                     jobRoutine.autoResumeCount = 1;
                     return true;
                 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
index 737936191e8..0eeb22ba2cb 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
@@ -638,21 +638,20 @@ public class RoutineLoadManagerTest {
 
         Assert.assertEquals(RoutineLoadJob.JobState.PAUSED, 
routineLoadJob.getState());
 
-        // 第一次自动恢复
         for (int i = 0; i < 3; i++) {
             Deencapsulation.setField(routineLoadJob, "pauseReason",
                     new ErrorReason(InternalErrorCode.REPLICA_FEW_ERR, ""));
+            try {
+                Thread.sleep(((long) Math.pow(2, i) * 10 * 1000L));
+            } catch (InterruptedException e) {
+                throw new UserException("thread sleep failed");
+            }
             routineLoadManager.updateRoutineLoadJob();
             Assert.assertEquals(RoutineLoadJob.JobState.NEED_SCHEDULE, 
routineLoadJob.getState());
             Deencapsulation.setField(routineLoadJob, "state", 
RoutineLoadJob.JobState.PAUSED);
-            boolean autoResumeLock = Deencapsulation.getField(routineLoadJob, 
"autoResumeLock");
-            Assert.assertEquals(autoResumeLock, false);
         }
-        // 第四次自动恢复 就会锁定
         routineLoadManager.updateRoutineLoadJob();
         Assert.assertEquals(RoutineLoadJob.JobState.PAUSED, 
routineLoadJob.getState());
-        boolean autoResumeLock = Deencapsulation.getField(routineLoadJob, 
"autoResumeLock");
-        Assert.assertEquals(autoResumeLock, true);
     }
 
     @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to