This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 1492a558996 [improve](routine-load) optimize routine load job auto resume policy (#35266) 1492a558996 is described below commit 1492a55899667c97c8714d86bf9795e3cdd1899f Author: hui lai <1353307...@qq.com> AuthorDate: Fri Jun 7 22:11:27 2024 +0800 [improve](routine-load) optimize routine load job auto resume policy (#35266) ## Proposed changes When some exception occur, such as `get offset failed ` for network isolation or be node down when upgrade, job will pause unexpectedly. Therefore, Doris introduce auto resume to keep job stable in https://github.com/apache/doris/pull/2958. **But currently there are some issues with this policy:** - If the fault recovery time is relatively long, such as the downtime of BE, then after auto resume three times, it will be completely unable to automatically resume. But in reality, it can be restored by automatically resume. **Improvement measures:** - Unrestricted retry attempts to solve the problem of automatic resume caused by long recovery time. - Add the backoff algorithm to solve the problem of some resume being invalid, such as Kafka having problems, but Doris still having a high resume frequency. The maximum backoff time is five minutes to avoid the difficulty of recovery caused by too much backoff time. --- .../main/java/org/apache/doris/common/Config.java | 2 +- .../doris/load/routineload/RoutineLoadJob.java | 3 +- .../doris/load/routineload/RoutineLoadManager.java | 3 +- .../doris/load/routineload/ScheduleRule.java | 40 ++++++++++------------ .../load/routineload/RoutineLoadManagerTest.java | 11 +++--- 5 files changed, 27 insertions(+), 32 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 615e034e568..96f91a97d17 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1267,7 +1267,7 @@ public class Config extends ConfigBase { * a period for auto resume routine load */ @ConfField(mutable = true, masterOnly = true) - public static int period_of_auto_resume_min = 5; + public static int period_of_auto_resume_min = 10; /** * If set to true, the backend will be automatically dropped after finishing decommission. diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 0954d0bac36..2b92ad63459 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -224,9 +224,8 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl protected int currentTaskConcurrentNum; protected RoutineLoadProgress progress; - protected long firstResumeTimestamp; // the first resume time + protected long latestResumeTimestamp; // the latest resume time protected long autoResumeCount; - protected boolean autoResumeLock = false; //it can't auto resume iff true // some other msg which need to show to user; protected String otherMsg = ""; protected ErrorReason pauseReason; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java index 871781737d5..e318ed77bb9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -368,8 +368,7 @@ public class RoutineLoadManager implements Writable { try { routineLoadJob.jobStatistic.errorRowsAfterResumed = 0; routineLoadJob.autoResumeCount = 0; - routineLoadJob.firstResumeTimestamp = 0; - routineLoadJob.autoResumeLock = false; + routineLoadJob.latestResumeTimestamp = 0; routineLoadJob.updateState(RoutineLoadJob.JobState.NEED_SCHEDULE, null, false /* not replay */); LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId()) .add("current_state", routineLoadJob.getState()) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/ScheduleRule.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/ScheduleRule.java index 2b4ef7b297a..dfc47aa496e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/ScheduleRule.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/ScheduleRule.java @@ -31,6 +31,10 @@ import org.apache.logging.log4j.Logger; public class ScheduleRule { private static final Logger LOG = LogManager.getLogger(ScheduleRule.class); + private static final long BACK_OFF_BASIC_TIME_SEC = 10L; + + private static final long MAX_BACK_OFF_TIME_SEC = 60 * 5; + private static int deadBeCount() { SystemInfoService systemInfoService = Env.getCurrentSystemInfo(); int total = systemInfoService.getAllBackendIds(false).size(); @@ -47,22 +51,10 @@ public class ScheduleRule { if (jobRoutine.state != RoutineLoadJob.JobState.PAUSED) { return false; } - if (jobRoutine.autoResumeLock) { //only manual resume for unlock - if (LOG.isDebugEnabled()) { - LOG.debug("routine load job {}'s autoResumeLock is true, skip", jobRoutine.id); - } - return false; - } /* * Handle all backends are down. */ - if (LOG.isDebugEnabled()) { - LOG.debug("try to auto reschedule routine load {}, firstResumeTimestamp: {}, autoResumeCount: {}, " - + "pause reason: {}", - jobRoutine.id, jobRoutine.firstResumeTimestamp, jobRoutine.autoResumeCount, - jobRoutine.pauseReason == null ? "null" : jobRoutine.pauseReason.getCode().name()); - } if (jobRoutine.pauseReason != null && jobRoutine.pauseReason.getCode() != InternalErrorCode.MANUAL_PAUSE_ERR && jobRoutine.pauseReason.getCode() != InternalErrorCode.TOO_MANY_FAILURE_ROWS_ERR @@ -77,19 +69,25 @@ public class ScheduleRule { return false; } - if (jobRoutine.firstResumeTimestamp == 0) { //the first resume - jobRoutine.firstResumeTimestamp = System.currentTimeMillis(); + if (jobRoutine.latestResumeTimestamp == 0) { //the first resume + jobRoutine.latestResumeTimestamp = System.currentTimeMillis(); jobRoutine.autoResumeCount = 1; return true; } else { long current = System.currentTimeMillis(); - if (current - jobRoutine.firstResumeTimestamp < Config.period_of_auto_resume_min * 60000L) { - if (jobRoutine.autoResumeCount >= 3) { - jobRoutine.autoResumeLock = true; // locked Auto Resume RoutineLoadJob - return false; + if (current - jobRoutine.latestResumeTimestamp < Config.period_of_auto_resume_min * 60000L) { + long autoResumeIntervalTimeSec = + Math.min((long) Math.pow(2, jobRoutine.autoResumeCount) * BACK_OFF_BASIC_TIME_SEC, + MAX_BACK_OFF_TIME_SEC); + if (current - jobRoutine.latestResumeTimestamp > autoResumeIntervalTimeSec * 1000L) { + LOG.info("try to auto reschedule routine load {}, latestResumeTimestamp: {}," + + " autoResumeCount: {}, pause reason: {}", + jobRoutine.id, jobRoutine.latestResumeTimestamp, jobRoutine.autoResumeCount, + jobRoutine.pauseReason == null ? "null" : jobRoutine.pauseReason.getCode().name()); + jobRoutine.latestResumeTimestamp = System.currentTimeMillis(); + jobRoutine.autoResumeCount++; + return true; } - jobRoutine.autoResumeCount++; - return true; } else { /** * for example: @@ -98,7 +96,7 @@ public class ScheduleRule { * the third resume time at 10:20 * --> we must be reset counter because a new period for AutoResume RoutineLoadJob */ - jobRoutine.firstResumeTimestamp = current; + jobRoutine.latestResumeTimestamp = current; jobRoutine.autoResumeCount = 1; return true; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java index 5adb1f0fc54..01449d887e5 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java @@ -638,21 +638,20 @@ public class RoutineLoadManagerTest { Assert.assertEquals(RoutineLoadJob.JobState.PAUSED, routineLoadJob.getState()); - // 第一次自动恢复 for (int i = 0; i < 3; i++) { Deencapsulation.setField(routineLoadJob, "pauseReason", new ErrorReason(InternalErrorCode.REPLICA_FEW_ERR, "")); + try { + Thread.sleep(((long) Math.pow(2, i) * 10 * 1000L)); + } catch (InterruptedException e) { + throw new UserException("thread sleep failed"); + } routineLoadManager.updateRoutineLoadJob(); Assert.assertEquals(RoutineLoadJob.JobState.NEED_SCHEDULE, routineLoadJob.getState()); Deencapsulation.setField(routineLoadJob, "state", RoutineLoadJob.JobState.PAUSED); - boolean autoResumeLock = Deencapsulation.getField(routineLoadJob, "autoResumeLock"); - Assert.assertEquals(autoResumeLock, false); } - // 第四次自动恢复 就会锁定 routineLoadManager.updateRoutineLoadJob(); Assert.assertEquals(RoutineLoadJob.JobState.PAUSED, routineLoadJob.getState()); - boolean autoResumeLock = Deencapsulation.getField(routineLoadJob, "autoResumeLock"); - Assert.assertEquals(autoResumeLock, true); } @Test --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org