This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 597bb696252 branch-3.0: [enhance](job) timely rescheduling tasks to 
avoid write jitter #53853 (#54054)
597bb696252 is described below

commit 597bb6962523bf7d5a3a38fba3330154719b84ce
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Aug 13 09:51:11 2025 +0800

    branch-3.0: [enhance](job) timely rescheduling tasks to avoid write jitter 
#53853 (#54054)
    
    Cherry-picked from #53853
    
    Co-authored-by: hui lai <[email protected]>
---
 .../load/routineload/RoutineLoadTaskScheduler.java |  37 +++++--
 .../test_routine_load_job_schedule.groovy          | 118 +++++++++++++++++++++
 2 files changed, 144 insertions(+), 11 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
index 040ca103004..28e03567765 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
@@ -24,6 +24,7 @@ import org.apache.doris.common.InternalErrorCode;
 import org.apache.doris.common.LoadException;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.DebugPointUtil;
 import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.common.util.LogBuilder;
 import org.apache.doris.common.util.LogKey;
@@ -228,23 +229,32 @@ public class RoutineLoadTaskScheduler extends 
MasterDaemon {
                 }
             }
         } catch (LoadException e) {
-            // submit task failed (such as TOO_MANY_TASKS error), but txn has 
already begun.
-            // Here we will still set the ExecuteStartTime of this task, which 
means
-            // we "assume" that this task has been successfully submitted.
-            // And this task will then be aborted because of a timeout.
-            // In this way, we can prevent the entire job from being paused 
due to submit errors,
-            // and we can also relieve the pressure on BE by waiting for the 
timeout period.
-            LOG.warn("failed to submit routine load task {} to BE: {}, error: 
{}",
-                    DebugUtil.printId(routineLoadTaskInfo.getId()),
-                    routineLoadTaskInfo.getBeId(), e.getMessage());
-            
routineLoadManager.getJob(routineLoadTaskInfo.getJobId()).setOtherMsg(e.getMessage());
-            // fall through to set ExecuteStartTime
+            handleSubmitTaskFailure(routineLoadTaskInfo, e.getMessage());
+            return;
         }
 
         // set the executeStartTimeMs of task
         routineLoadTaskInfo.setExecuteStartTimeMs(System.currentTimeMillis());
     }
 
+    private void handleSubmitTaskFailure(RoutineLoadTaskInfo 
routineLoadTaskInfo, String errorMsg) {
+        LOG.warn("failed to submit routine load task {} to BE: {}, error: {}",
+                DebugUtil.printId(routineLoadTaskInfo.getId()),
+                routineLoadTaskInfo.getBeId(), errorMsg);
+        routineLoadTaskInfo.setBeId(-1);
+        RoutineLoadJob routineLoadJob = 
routineLoadManager.getJob(routineLoadTaskInfo.getJobId());
+        routineLoadJob.setOtherMsg(errorMsg);
+
+        // Check if this is a resource pressure error that should not be 
immediately rescheduled
+        if (errorMsg.contains("TOO_MANY_TASKS") || 
errorMsg.contains("MEM_LIMIT_EXCEEDED")) {
+            return;
+        }
+
+        // for other errors (network issues, BE restart, etc.), reschedule 
immediately
+        RoutineLoadTaskInfo newTask = 
routineLoadJob.unprotectRenewTask(routineLoadTaskInfo);
+        addTaskInQueue(newTask);
+    }
+
     private void updateBackendSlotIfNecessary() {
         long currentTime = System.currentTimeMillis();
         if (lastBackendSlotUpdateTime == -1
@@ -287,6 +297,11 @@ public class RoutineLoadTaskScheduler extends MasterDaemon 
{
             TStatus tStatus = 
client.submitRoutineLoadTask(Lists.newArrayList(tTask));
             ok = true;
 
+            if (DebugPointUtil.isEnable("FE.ROUTINE_LOAD_TASK_SUBMIT_FAILED")) 
{
+                LOG.warn("debug point FE.ROUTINE_LOAD_TASK_SUBMIT_FAILED, 
routine load task submit failed");
+                throw new LoadException("debug point 
FE.ROUTINE_LOAD_TASK_SUBMIT_FAILED");
+            }
+
             if (tStatus.getStatusCode() != TStatusCode.OK) {
                 throw new LoadException("failed to submit task. error code: " 
+ tStatus.getStatusCode()
                         + ", msg: " + (tStatus.getErrorMsgsSize() > 0 ? 
tStatus.getErrorMsgs().get(0) : "NaN"));
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_job_schedule.groovy
 
b/regression-test/suites/load_p0/routine_load/test_routine_load_job_schedule.groovy
new file mode 100644
index 00000000000..c892d9dbde0
--- /dev/null
+++ 
b/regression-test/suites/load_p0/routine_load/test_routine_load_job_schedule.groovy
@@ -0,0 +1,118 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.kafka.clients.admin.AdminClient
+import org.apache.kafka.clients.admin.NewTopic
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.ProducerConfig
+import java.util.Collections
+
+suite("test_routine_load_job_schedule","nonConcurrent") {
+    def kafkaCsvTpoics = [
+                  "test_routine_load_job_schedule",
+                ]
+    String enabled = context.config.otherConfigs.get("enableKafkaTest")
+    String kafka_port = context.config.otherConfigs.get("kafka_port")
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+    def kafka_broker = "${externalEnvIp}:${kafka_port}"
+
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        def props = new Properties()
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        def producer = new KafkaProducer<>(props)
+        def adminClient = AdminClient.create(props)
+        def newTopic = new NewTopic(kafkaCsvTpoics[0], 5, (short)1)
+        def testData = [
+            "1,test_data_1,2023-01-01,value1,2023-01-01 10:00:00,extra1",
+            "2,test_data_2,2023-01-02,value2,2023-01-02 11:00:00,extra2",
+            "3,test_data_3,2023-01-03,value3,2023-01-03 12:00:00,extra3",
+            "4,test_data_4,2023-01-04,value4,2023-01-04 13:00:00,extra4",
+            "5,test_data_5,2023-01-05,value5,2023-01-05 14:00:00,extra5"
+        ]
+        adminClient.createTopics(Collections.singletonList(newTopic))
+        testData.eachWithIndex { line, index ->
+            logger.info("Sending data to kafka: ${line}")
+            def record = new ProducerRecord<>(newTopic.name(), index, null, 
line)
+            producer.send(record)
+        }
+        producer.close()
+
+        def tableName = "test_routine_load_job_schedule"
+        def job = "test_routine_load_job_schedule"
+        sql """ DROP TABLE IF EXISTS ${tableName} """
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName} (
+                `k1` int(20) NULL,
+                `k2` string NULL,
+                `v1` date  NULL,
+                `v2` string  NULL,
+                `v3` datetime  NULL,
+                `v4` string  NULL
+            ) ENGINE=OLAP
+            DUPLICATE KEY(`k1`)
+            COMMENT 'OLAP'
+            DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+            PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+        """
+        try {
+            
GetDebugPoint().enableDebugPointForAllBEs("FE.ROUTINE_LOAD_TASK_SUBMIT_FAILED")
+            sql """
+                CREATE ROUTINE LOAD ${job} ON ${tableName}
+                COLUMNS TERMINATED BY ","
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${kafka_broker}",
+                    "kafka_topic" = "${newTopic.name()}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+
+            sleep(5000)
+            
GetDebugPoint().disableDebugPointForAllBEs("FE.ROUTINE_LOAD_TASK_SUBMIT_FAILED")
+            def count = 0
+            def maxWaitCount = 60
+            while (true) {
+                def state = sql "show routine load for ${job}"
+                def routineLoadState = state[0][8].toString()
+                def statistic = state[0][14].toString()
+                logger.info("Routine load state: ${routineLoadState}")
+                logger.info("Routine load statistic: ${statistic}")
+                def rowCount = sql "select count(*) from ${tableName}"
+                if (routineLoadState == "RUNNING" && rowCount[0][0] == 5) {
+                    break
+                }
+                if (count > maxWaitCount) {
+                    assertEquals(1, 2)
+                }
+                sleep(1000)
+                count++
+            }
+        } catch (Exception e) {
+            logger.error("Test failed with exception: ${e.message}")
+        } finally {
+            
GetDebugPoint().disableDebugPointForAllBEs("FE.ROUTINE_LOAD_TASK_SUBMIT_FAILED")
+            try {
+                sql "stop routine load for ${job}"
+            } catch (Exception e) {
+                logger.warn("Failed to stop routine load job: ${e.message}")
+            }
+        }
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to