This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new e38902db180 branch-2.1: [improve](routine load) ensure abnormal jobs
do not interfere with normal task scheduling #47530 (#47848)
e38902db180 is described below
commit e38902db180017e7aa80fa0d67200251b4ac6065
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Feb 14 21:52:03 2025 +0800
branch-2.1: [improve](routine load) ensure abnormal jobs do not interfere
with normal task scheduling #47530 (#47848)
Cherry-picked from #47530
Co-authored-by: hui lai <[email protected]>
---
.../load/routineload/KafkaRoutineLoadJob.java | 89 +++++++++++++---------
.../doris/load/routineload/RoutineLoadJob.java | 16 +++-
.../load/routineload/RoutineLoadManagerTest.java | 3 +-
3 files changed, 65 insertions(+), 43 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index df52730a85e..bff1f60be81 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -327,18 +327,19 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
}
@Override
- protected void preCheckNeedSchedule() throws UserException {
+ protected boolean refreshKafkaPartitions(boolean needAutoResume) throws
UserException {
// If user does not specify kafka partition,
// We will fetch partition from kafka server periodically
- if (this.state == JobState.RUNNING || this.state ==
JobState.NEED_SCHEDULE) {
+ if (this.state == JobState.RUNNING || this.state ==
JobState.NEED_SCHEDULE || needAutoResume) {
if (customKafkaPartitions != null &&
!customKafkaPartitions.isEmpty()) {
- return;
+ return true;
}
- updateKafkaPartitions();
+ return updateKafkaPartitions();
}
+ return true;
}
- private void updateKafkaPartitions() throws UserException {
+ private boolean updateKafkaPartitions() throws UserException {
try {
this.newCurrentKafkaPartition = getAllKafkaPartitions();
} catch (Exception e) {
@@ -353,7 +354,9 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
new ErrorReason(InternalErrorCode.PARTITIONS_ERR, msg),
false /* not replay */);
}
+ return false;
}
+ return true;
}
// if customKafkaPartition is not null, then return false immediately
@@ -365,33 +368,20 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
protected boolean unprotectNeedReschedule() throws UserException {
// only running and need_schedule job need to be changed current kafka
partitions
if (this.state == JobState.RUNNING || this.state ==
JobState.NEED_SCHEDULE) {
- if (CollectionUtils.isNotEmpty(customKafkaPartitions)) {
- currentKafkaPartitions = customKafkaPartitions;
- return false;
- }
- // the newCurrentKafkaPartition should be already updated in
preCheckNeedScheduler()
- Preconditions.checkNotNull(this.newCurrentKafkaPartition);
- if (new
HashSet<>(currentKafkaPartitions).containsAll(this.newCurrentKafkaPartition)) {
- if (currentKafkaPartitions.size() >
this.newCurrentKafkaPartition.size()) {
- currentKafkaPartitions = this.newCurrentKafkaPartition;
- if (LOG.isDebugEnabled()) {
- LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
- .add("current_kafka_partitions",
Joiner.on(",").join(currentKafkaPartitions))
- .add("msg", "current kafka partitions has been
change")
- .build());
- }
- return true;
- } else {
- // if the partitions of currentKafkaPartitions and
progress are inconsistent,
- // We should also update the progress
- for (Integer kafkaPartition : currentKafkaPartitions) {
- if (!((KafkaProgress)
progress).containsPartition(kafkaPartition)) {
- return true;
- }
- }
- return false;
- }
- } else {
+ return isKafkaPartitionsChanged();
+ }
+ return false;
+ }
+
+ private boolean isKafkaPartitionsChanged() throws UserException {
+ if (CollectionUtils.isNotEmpty(customKafkaPartitions)) {
+ currentKafkaPartitions = customKafkaPartitions;
+ return false;
+ }
+ // the newCurrentKafkaPartition should be already updated in
preCheckNeedScheduler()
+ Preconditions.checkNotNull(this.newCurrentKafkaPartition);
+ if (new
HashSet<>(currentKafkaPartitions).containsAll(this.newCurrentKafkaPartition)) {
+ if (currentKafkaPartitions.size() >
this.newCurrentKafkaPartition.size()) {
currentKafkaPartitions = this.newCurrentKafkaPartition;
if (LOG.isDebugEnabled()) {
LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
@@ -400,14 +390,39 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
.build());
}
return true;
+ } else {
+ // if the partitions of currentKafkaPartitions and progress
are inconsistent,
+ // We should also update the progress
+ for (Integer kafkaPartition : currentKafkaPartitions) {
+ if (!((KafkaProgress)
progress).containsPartition(kafkaPartition)) {
+ return true;
+ }
+ }
+ return false;
}
-
- }
- if (this.state == JobState.PAUSED) {
- return ScheduleRule.isNeedAutoSchedule(this);
+ } else {
+ currentKafkaPartitions = this.newCurrentKafkaPartition;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
+ .add("current_kafka_partitions",
Joiner.on(",").join(currentKafkaPartitions))
+ .add("msg", "current kafka partitions has been change")
+ .build());
+ }
+ return true;
}
- return false;
+ }
+ @Override
+ protected boolean needAutoResume() {
+ writeLock();
+ try {
+ if (this.state == JobState.PAUSED) {
+ return ScheduleRule.isNeedAutoSchedule(this);
+ }
+ return false;
+ } finally {
+ writeUnlock();
+ }
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index fe3fbbe8d7f..92697f816d6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -1454,11 +1454,15 @@ public abstract class RoutineLoadJob extends
AbstractTxnStateChangeCallback impl
}
}
- preCheckNeedSchedule();
+ boolean needAutoResume = needAutoResume();
+
+ if (!refreshKafkaPartitions(needAutoResume)) {
+ return;
+ }
writeLock();
try {
- if (unprotectNeedReschedule()) {
+ if (unprotectNeedReschedule() || needAutoResume) {
LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
.add("msg", "Job need to be rescheduled")
.build());
@@ -1475,8 +1479,8 @@ public abstract class RoutineLoadJob extends
AbstractTxnStateChangeCallback impl
// Because unprotectUpdateProgress() is protected by writelock.
// So if there are time-consuming operations, they should be done in this
method.
// (Such as getAllKafkaPartitions() in KafkaRoutineLoad)
- protected void preCheckNeedSchedule() throws UserException {
-
+ protected boolean refreshKafkaPartitions(boolean needAutoResume) throws
UserException {
+ return false;
}
protected void unprotectUpdateProgress() throws UserException {
@@ -1486,6 +1490,10 @@ public abstract class RoutineLoadJob extends
AbstractTxnStateChangeCallback impl
return false;
}
+ protected boolean needAutoResume() {
+ return false;
+ }
+
public void setOrigStmt(OriginStatement origStmt) {
this.origStmt = origStmt;
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
index d1a47558ef1..2a9b8e5b1af 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
@@ -648,8 +648,7 @@ public class RoutineLoadManagerTest {
throw new UserException("thread sleep failed");
}
routineLoadManager.updateRoutineLoadJob();
- Assert.assertEquals(RoutineLoadJob.JobState.NEED_SCHEDULE,
routineLoadJob.getState());
- Deencapsulation.setField(routineLoadJob, "state",
RoutineLoadJob.JobState.PAUSED);
+ Assert.assertEquals(RoutineLoadJob.JobState.PAUSED,
routineLoadJob.getState());
}
routineLoadManager.updateRoutineLoadJob();
Assert.assertEquals(RoutineLoadJob.JobState.PAUSED,
routineLoadJob.getState());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]