This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new e38902db180 branch-2.1: [improve](routine load) ensure abnormal jobs 
do not interfere with normal task scheduling #47530 (#47848)
e38902db180 is described below

commit e38902db180017e7aa80fa0d67200251b4ac6065
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Feb 14 21:52:03 2025 +0800

    branch-2.1: [improve](routine load) ensure abnormal jobs do not interfere 
with normal task scheduling #47530 (#47848)
    
    Cherry-picked from #47530
    
    Co-authored-by: hui lai <lai...@selectdb.com>
---
 .../load/routineload/KafkaRoutineLoadJob.java      | 89 +++++++++++++---------
 .../doris/load/routineload/RoutineLoadJob.java     | 16 +++-
 .../load/routineload/RoutineLoadManagerTest.java   |  3 +-
 3 files changed, 65 insertions(+), 43 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index df52730a85e..bff1f60be81 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -327,18 +327,19 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
     }
 
     @Override
-    protected void preCheckNeedSchedule() throws UserException {
+    protected boolean refreshKafkaPartitions(boolean needAutoResume) throws 
UserException {
         // If user does not specify kafka partition,
         // We will fetch partition from kafka server periodically
-        if (this.state == JobState.RUNNING || this.state == 
JobState.NEED_SCHEDULE) {
+        if (this.state == JobState.RUNNING || this.state == 
JobState.NEED_SCHEDULE || needAutoResume) {
             if (customKafkaPartitions != null && 
!customKafkaPartitions.isEmpty()) {
-                return;
+                return true;
             }
-            updateKafkaPartitions();
+            return updateKafkaPartitions();
         }
+        return true;
     }
 
-    private void updateKafkaPartitions() throws UserException {
+    private boolean updateKafkaPartitions() throws UserException {
         try {
             this.newCurrentKafkaPartition = getAllKafkaPartitions();
         } catch (Exception e) {
@@ -353,7 +354,9 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
                         new ErrorReason(InternalErrorCode.PARTITIONS_ERR, msg),
                         false /* not replay */);
             }
+            return false;
         }
+        return true;
     }
 
     // if customKafkaPartition is not null, then return false immediately
@@ -365,33 +368,20 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
     protected boolean unprotectNeedReschedule() throws UserException {
         // only running and need_schedule job need to be changed current kafka 
partitions
         if (this.state == JobState.RUNNING || this.state == 
JobState.NEED_SCHEDULE) {
-            if (CollectionUtils.isNotEmpty(customKafkaPartitions)) {
-                currentKafkaPartitions = customKafkaPartitions;
-                return false;
-            }
-            // the newCurrentKafkaPartition should be already updated in 
preCheckNeedScheduler()
-            Preconditions.checkNotNull(this.newCurrentKafkaPartition);
-            if (new 
HashSet<>(currentKafkaPartitions).containsAll(this.newCurrentKafkaPartition)) {
-                if (currentKafkaPartitions.size() > 
this.newCurrentKafkaPartition.size()) {
-                    currentKafkaPartitions = this.newCurrentKafkaPartition;
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
-                                .add("current_kafka_partitions", 
Joiner.on(",").join(currentKafkaPartitions))
-                                .add("msg", "current kafka partitions has been 
change")
-                                .build());
-                    }
-                    return true;
-                } else {
-                    // if the partitions of currentKafkaPartitions and 
progress are inconsistent,
-                    // We should also update the progress
-                    for (Integer kafkaPartition : currentKafkaPartitions) {
-                        if (!((KafkaProgress) 
progress).containsPartition(kafkaPartition)) {
-                            return true;
-                        }
-                    }
-                    return false;
-                }
-            } else {
+            return isKafkaPartitionsChanged();
+        }
+        return false;
+    }
+
+    private boolean isKafkaPartitionsChanged() throws UserException {
+        if (CollectionUtils.isNotEmpty(customKafkaPartitions)) {
+            currentKafkaPartitions = customKafkaPartitions;
+            return false;
+        }
+        // the newCurrentKafkaPartition should be already updated in 
preCheckNeedScheduler()
+        Preconditions.checkNotNull(this.newCurrentKafkaPartition);
+        if (new 
HashSet<>(currentKafkaPartitions).containsAll(this.newCurrentKafkaPartition)) {
+            if (currentKafkaPartitions.size() > 
this.newCurrentKafkaPartition.size()) {
                 currentKafkaPartitions = this.newCurrentKafkaPartition;
                 if (LOG.isDebugEnabled()) {
                     LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
@@ -400,14 +390,39 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
                             .build());
                 }
                 return true;
+            } else {
+                // if the partitions of currentKafkaPartitions and progress 
are inconsistent,
+                // We should also update the progress
+                for (Integer kafkaPartition : currentKafkaPartitions) {
+                    if (!((KafkaProgress) 
progress).containsPartition(kafkaPartition)) {
+                        return true;
+                    }
+                }
+                return false;
             }
-
-        }
-        if (this.state == JobState.PAUSED) {
-            return ScheduleRule.isNeedAutoSchedule(this);
+        } else {
+            currentKafkaPartitions = this.newCurrentKafkaPartition;
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
+                        .add("current_kafka_partitions", 
Joiner.on(",").join(currentKafkaPartitions))
+                        .add("msg", "current kafka partitions has been change")
+                        .build());
+            }
+            return true;
         }
-        return false;
+    }
 
+    @Override
+    protected boolean needAutoResume() {
+        writeLock();
+        try {
+            if (this.state == JobState.PAUSED) {
+                return ScheduleRule.isNeedAutoSchedule(this);
+            }
+            return false;
+        } finally {
+            writeUnlock();
+        }
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index fe3fbbe8d7f..92697f816d6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -1454,11 +1454,15 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
             }
         }
 
-        preCheckNeedSchedule();
+        boolean needAutoResume = needAutoResume();
+
+        if (!refreshKafkaPartitions(needAutoResume)) {
+            return;
+        }
 
         writeLock();
         try {
-            if (unprotectNeedReschedule()) {
+            if (unprotectNeedReschedule() || needAutoResume) {
                 LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
                         .add("msg", "Job need to be rescheduled")
                         .build());
@@ -1475,8 +1479,8 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
     // Because unprotectUpdateProgress() is protected by writelock.
     // So if there are time-consuming operations, they should be done in this 
method.
     // (Such as getAllKafkaPartitions() in KafkaRoutineLoad)
-    protected void preCheckNeedSchedule() throws UserException {
-
+    protected boolean refreshKafkaPartitions(boolean needAutoResume) throws 
UserException {
+        return false;
     }
 
     protected void unprotectUpdateProgress() throws UserException {
@@ -1486,6 +1490,10 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
         return false;
     }
 
+    protected boolean needAutoResume() {
+        return false;
+    }
+
     public void setOrigStmt(OriginStatement origStmt) {
         this.origStmt = origStmt;
     }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
index d1a47558ef1..2a9b8e5b1af 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
@@ -648,8 +648,7 @@ public class RoutineLoadManagerTest {
                 throw new UserException("thread sleep failed");
             }
             routineLoadManager.updateRoutineLoadJob();
-            Assert.assertEquals(RoutineLoadJob.JobState.NEED_SCHEDULE, 
routineLoadJob.getState());
-            Deencapsulation.setField(routineLoadJob, "state", 
RoutineLoadJob.JobState.PAUSED);
+            Assert.assertEquals(RoutineLoadJob.JobState.PAUSED, 
routineLoadJob.getState());
         }
         routineLoadManager.updateRoutineLoadJob();
         Assert.assertEquals(RoutineLoadJob.JobState.PAUSED, 
routineLoadJob.getState());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to