This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new e38902db180 branch-2.1: [improve](routine load) ensure abnormal jobs do not interfere with normal task scheduling #47530 (#47848) e38902db180 is described below commit e38902db180017e7aa80fa0d67200251b4ac6065 Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Fri Feb 14 21:52:03 2025 +0800 branch-2.1: [improve](routine load) ensure abnormal jobs do not interfere with normal task scheduling #47530 (#47848) Cherry-picked from #47530 Co-authored-by: hui lai <lai...@selectdb.com> --- .../load/routineload/KafkaRoutineLoadJob.java | 89 +++++++++++++--------- .../doris/load/routineload/RoutineLoadJob.java | 16 +++- .../load/routineload/RoutineLoadManagerTest.java | 3 +- 3 files changed, 65 insertions(+), 43 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index df52730a85e..bff1f60be81 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -327,18 +327,19 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { } @Override - protected void preCheckNeedSchedule() throws UserException { + protected boolean refreshKafkaPartitions(boolean needAutoResume) throws UserException { // If user does not specify kafka partition, // We will fetch partition from kafka server periodically - if (this.state == JobState.RUNNING || this.state == JobState.NEED_SCHEDULE) { + if (this.state == JobState.RUNNING || this.state == JobState.NEED_SCHEDULE || needAutoResume) { if (customKafkaPartitions != null && !customKafkaPartitions.isEmpty()) { - return; + return true; } - updateKafkaPartitions(); + return updateKafkaPartitions(); } + return true; } - private void updateKafkaPartitions() throws UserException { + private boolean updateKafkaPartitions() throws UserException { try { this.newCurrentKafkaPartition = getAllKafkaPartitions(); } catch (Exception e) { @@ -353,7 +354,9 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { new ErrorReason(InternalErrorCode.PARTITIONS_ERR, msg), false /* not replay */); } + return false; } + return true; } // if customKafkaPartition is not null, then return false immediately @@ -365,33 +368,20 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { protected boolean unprotectNeedReschedule() throws UserException { // only running and need_schedule job need to be changed current kafka partitions if (this.state == JobState.RUNNING || this.state == JobState.NEED_SCHEDULE) { - if (CollectionUtils.isNotEmpty(customKafkaPartitions)) { - currentKafkaPartitions = customKafkaPartitions; - return false; - } - // the newCurrentKafkaPartition should be already updated in preCheckNeedScheduler() - Preconditions.checkNotNull(this.newCurrentKafkaPartition); - if (new HashSet<>(currentKafkaPartitions).containsAll(this.newCurrentKafkaPartition)) { - if (currentKafkaPartitions.size() > this.newCurrentKafkaPartition.size()) { - currentKafkaPartitions = this.newCurrentKafkaPartition; - if (LOG.isDebugEnabled()) { - LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) - .add("current_kafka_partitions", Joiner.on(",").join(currentKafkaPartitions)) - .add("msg", "current kafka partitions has been change") - .build()); - } - return true; - } else { - // if the partitions of currentKafkaPartitions and progress are inconsistent, - // We should also update the progress - for (Integer kafkaPartition : currentKafkaPartitions) { - if (!((KafkaProgress) progress).containsPartition(kafkaPartition)) { - return true; - } - } - return false; - } - } else { + return isKafkaPartitionsChanged(); + } + return false; + } + + private boolean isKafkaPartitionsChanged() throws UserException { + if (CollectionUtils.isNotEmpty(customKafkaPartitions)) { + currentKafkaPartitions = customKafkaPartitions; + return false; + } + // the newCurrentKafkaPartition should be already updated in preCheckNeedScheduler() + Preconditions.checkNotNull(this.newCurrentKafkaPartition); + if (new HashSet<>(currentKafkaPartitions).containsAll(this.newCurrentKafkaPartition)) { + if (currentKafkaPartitions.size() > this.newCurrentKafkaPartition.size()) { currentKafkaPartitions = this.newCurrentKafkaPartition; if (LOG.isDebugEnabled()) { LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) @@ -400,14 +390,39 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { .build()); } return true; + } else { + // if the partitions of currentKafkaPartitions and progress are inconsistent, + // We should also update the progress + for (Integer kafkaPartition : currentKafkaPartitions) { + if (!((KafkaProgress) progress).containsPartition(kafkaPartition)) { + return true; + } + } + return false; } - - } - if (this.state == JobState.PAUSED) { - return ScheduleRule.isNeedAutoSchedule(this); + } else { + currentKafkaPartitions = this.newCurrentKafkaPartition; + if (LOG.isDebugEnabled()) { + LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) + .add("current_kafka_partitions", Joiner.on(",").join(currentKafkaPartitions)) + .add("msg", "current kafka partitions has been change") + .build()); + } + return true; } - return false; + } + @Override + protected boolean needAutoResume() { + writeLock(); + try { + if (this.state == JobState.PAUSED) { + return ScheduleRule.isNeedAutoSchedule(this); + } + return false; + } finally { + writeUnlock(); + } } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index fe3fbbe8d7f..92697f816d6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -1454,11 +1454,15 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl } } - preCheckNeedSchedule(); + boolean needAutoResume = needAutoResume(); + + if (!refreshKafkaPartitions(needAutoResume)) { + return; + } writeLock(); try { - if (unprotectNeedReschedule()) { + if (unprotectNeedReschedule() || needAutoResume) { LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) .add("msg", "Job need to be rescheduled") .build()); @@ -1475,8 +1479,8 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl // Because unprotectUpdateProgress() is protected by writelock. // So if there are time-consuming operations, they should be done in this method. // (Such as getAllKafkaPartitions() in KafkaRoutineLoad) - protected void preCheckNeedSchedule() throws UserException { - + protected boolean refreshKafkaPartitions(boolean needAutoResume) throws UserException { + return false; } protected void unprotectUpdateProgress() throws UserException { @@ -1486,6 +1490,10 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl return false; } + protected boolean needAutoResume() { + return false; + } + public void setOrigStmt(OriginStatement origStmt) { this.origStmt = origStmt; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java index d1a47558ef1..2a9b8e5b1af 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java @@ -648,8 +648,7 @@ public class RoutineLoadManagerTest { throw new UserException("thread sleep failed"); } routineLoadManager.updateRoutineLoadJob(); - Assert.assertEquals(RoutineLoadJob.JobState.NEED_SCHEDULE, routineLoadJob.getState()); - Deencapsulation.setField(routineLoadJob, "state", RoutineLoadJob.JobState.PAUSED); + Assert.assertEquals(RoutineLoadJob.JobState.PAUSED, routineLoadJob.getState()); } routineLoadManager.updateRoutineLoadJob(); Assert.assertEquals(RoutineLoadJob.JobState.PAUSED, routineLoadJob.getState()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org