This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 1492a558996 [improve](routine-load) optimize routine load job auto 
resume policy (#35266)
1492a558996 is described below

commit 1492a55899667c97c8714d86bf9795e3cdd1899f
Author: hui lai <1353307...@qq.com>
AuthorDate: Fri Jun 7 22:11:27 2024 +0800

    [improve](routine-load) optimize routine load job auto resume policy 
(#35266)
    
    ## Proposed changes
    
    When some exception occur, such as `get offset failed ` for network
    isolation or be node down when upgrade, job will pause unexpectedly.
    Therefore, Doris introduce auto resume to keep job stable in
    https://github.com/apache/doris/pull/2958.
    
    **But currently there are some issues with this policy:**
    
    - If the fault recovery time is relatively long, such as the downtime of
    BE, then after auto resume three times, it will be completely unable to
    automatically resume. But in reality, it can be restored by
    automatically resume.
    
    **Improvement measures:**
    
    - Unrestricted retry attempts to solve the problem of automatic resume
    caused by long recovery time.
    
    - Add the backoff algorithm to solve the problem of some resume being
    invalid, such as Kafka having problems, but Doris still having a high
    resume frequency. The maximum backoff time is five minutes to avoid the
    difficulty of recovery caused by too much backoff time.
---
 .../main/java/org/apache/doris/common/Config.java  |  2 +-
 .../doris/load/routineload/RoutineLoadJob.java     |  3 +-
 .../doris/load/routineload/RoutineLoadManager.java |  3 +-
 .../doris/load/routineload/ScheduleRule.java       | 40 ++++++++++------------
 .../load/routineload/RoutineLoadManagerTest.java   | 11 +++---
 5 files changed, 27 insertions(+), 32 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 615e034e568..96f91a97d17 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1267,7 +1267,7 @@ public class Config extends ConfigBase {
      * a period for auto resume routine load
      */
     @ConfField(mutable = true, masterOnly = true)
-    public static int period_of_auto_resume_min = 5;
+    public static int period_of_auto_resume_min = 10;
 
     /**
      * If set to true, the backend will be automatically dropped after 
finishing decommission.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 0954d0bac36..2b92ad63459 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -224,9 +224,8 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
     protected int currentTaskConcurrentNum;
     protected RoutineLoadProgress progress;
 
-    protected long firstResumeTimestamp; // the first resume time
+    protected long latestResumeTimestamp; // the latest resume time
     protected long autoResumeCount;
-    protected boolean autoResumeLock = false; //it can't auto resume iff true
     // some other msg which need to show to user;
     protected String otherMsg = "";
     protected ErrorReason pauseReason;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
index 871781737d5..e318ed77bb9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
@@ -368,8 +368,7 @@ public class RoutineLoadManager implements Writable {
             try {
                 routineLoadJob.jobStatistic.errorRowsAfterResumed = 0;
                 routineLoadJob.autoResumeCount = 0;
-                routineLoadJob.firstResumeTimestamp = 0;
-                routineLoadJob.autoResumeLock = false;
+                routineLoadJob.latestResumeTimestamp = 0;
                 
routineLoadJob.updateState(RoutineLoadJob.JobState.NEED_SCHEDULE, null, false 
/* not replay */);
                 LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, 
routineLoadJob.getId())
                         .add("current_state", routineLoadJob.getState())
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/ScheduleRule.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/ScheduleRule.java
index 2b4ef7b297a..dfc47aa496e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/ScheduleRule.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/ScheduleRule.java
@@ -31,6 +31,10 @@ import org.apache.logging.log4j.Logger;
 public class ScheduleRule {
     private static final Logger LOG = LogManager.getLogger(ScheduleRule.class);
 
+    private static final long BACK_OFF_BASIC_TIME_SEC = 10L;
+
+    private static final long MAX_BACK_OFF_TIME_SEC = 60 * 5;
+
     private static int deadBeCount() {
         SystemInfoService systemInfoService = Env.getCurrentSystemInfo();
         int total = systemInfoService.getAllBackendIds(false).size();
@@ -47,22 +51,10 @@ public class ScheduleRule {
         if (jobRoutine.state != RoutineLoadJob.JobState.PAUSED) {
             return false;
         }
-        if (jobRoutine.autoResumeLock) { //only manual resume for unlock
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("routine load job {}'s autoResumeLock is true, 
skip", jobRoutine.id);
-            }
-            return false;
-        }
 
         /*
          * Handle all backends are down.
          */
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("try to auto reschedule routine load {}, 
firstResumeTimestamp: {}, autoResumeCount: {}, "
-                            + "pause reason: {}",
-                    jobRoutine.id, jobRoutine.firstResumeTimestamp, 
jobRoutine.autoResumeCount,
-                    jobRoutine.pauseReason == null ? "null" : 
jobRoutine.pauseReason.getCode().name());
-        }
         if (jobRoutine.pauseReason != null
                 && jobRoutine.pauseReason.getCode() != 
InternalErrorCode.MANUAL_PAUSE_ERR
                 && jobRoutine.pauseReason.getCode() != 
InternalErrorCode.TOO_MANY_FAILURE_ROWS_ERR
@@ -77,19 +69,25 @@ public class ScheduleRule {
                 return false;
             }
 
-            if (jobRoutine.firstResumeTimestamp == 0) { //the first resume
-                jobRoutine.firstResumeTimestamp = System.currentTimeMillis();
+            if (jobRoutine.latestResumeTimestamp == 0) { //the first resume
+                jobRoutine.latestResumeTimestamp = System.currentTimeMillis();
                 jobRoutine.autoResumeCount = 1;
                 return true;
             } else {
                 long current = System.currentTimeMillis();
-                if (current - jobRoutine.firstResumeTimestamp < 
Config.period_of_auto_resume_min * 60000L) {
-                    if (jobRoutine.autoResumeCount >= 3) {
-                        jobRoutine.autoResumeLock = true; // locked Auto 
Resume RoutineLoadJob
-                        return false;
+                if (current - jobRoutine.latestResumeTimestamp < 
Config.period_of_auto_resume_min * 60000L) {
+                    long autoResumeIntervalTimeSec =
+                            Math.min((long) Math.pow(2, 
jobRoutine.autoResumeCount) * BACK_OFF_BASIC_TIME_SEC,
+                                    MAX_BACK_OFF_TIME_SEC);
+                    if (current - jobRoutine.latestResumeTimestamp > 
autoResumeIntervalTimeSec * 1000L) {
+                        LOG.info("try to auto reschedule routine load {}, 
latestResumeTimestamp: {},"
+                                + "  autoResumeCount: {}, pause reason: {}",
+                                jobRoutine.id, 
jobRoutine.latestResumeTimestamp, jobRoutine.autoResumeCount,
+                                jobRoutine.pauseReason == null ? "null" : 
jobRoutine.pauseReason.getCode().name());
+                        jobRoutine.latestResumeTimestamp = 
System.currentTimeMillis();
+                        jobRoutine.autoResumeCount++;
+                        return true;
                     }
-                    jobRoutine.autoResumeCount++;
-                    return true;
                 } else {
                     /**
                      * for example:
@@ -98,7 +96,7 @@ public class ScheduleRule {
                      *       the third resume time at 10:20
                      *           --> we must be reset counter because a new 
period for AutoResume RoutineLoadJob
                      */
-                    jobRoutine.firstResumeTimestamp = current;
+                    jobRoutine.latestResumeTimestamp = current;
                     jobRoutine.autoResumeCount = 1;
                     return true;
                 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
index 5adb1f0fc54..01449d887e5 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
@@ -638,21 +638,20 @@ public class RoutineLoadManagerTest {
 
         Assert.assertEquals(RoutineLoadJob.JobState.PAUSED, 
routineLoadJob.getState());
 
-        // 第一次自动恢复
         for (int i = 0; i < 3; i++) {
             Deencapsulation.setField(routineLoadJob, "pauseReason",
                     new ErrorReason(InternalErrorCode.REPLICA_FEW_ERR, ""));
+            try {
+                Thread.sleep(((long) Math.pow(2, i) * 10 * 1000L));
+            } catch (InterruptedException e) {
+                throw new UserException("thread sleep failed");
+            }
             routineLoadManager.updateRoutineLoadJob();
             Assert.assertEquals(RoutineLoadJob.JobState.NEED_SCHEDULE, 
routineLoadJob.getState());
             Deencapsulation.setField(routineLoadJob, "state", 
RoutineLoadJob.JobState.PAUSED);
-            boolean autoResumeLock = Deencapsulation.getField(routineLoadJob, 
"autoResumeLock");
-            Assert.assertEquals(autoResumeLock, false);
         }
-        // 第四次自动恢复 就会锁定
         routineLoadManager.updateRoutineLoadJob();
         Assert.assertEquals(RoutineLoadJob.JobState.PAUSED, 
routineLoadJob.getState());
-        boolean autoResumeLock = Deencapsulation.getField(routineLoadJob, 
"autoResumeLock");
-        Assert.assertEquals(autoResumeLock, true);
     }
 
     @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to