This is an automated email from the ASF dual-hosted git repository.
liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 1b6476e3840 [enhance](job) delay load task schedule when transaction
fail (#57092)
1b6476e3840 is described below
commit 1b6476e3840a159af35053ac0c53ef713ebdb9ca
Author: hui lai <[email protected]>
AuthorDate: Sun Oct 19 16:52:23 2025 +0800
[enhance](job) delay load task schedule when transaction fail (#57092)
Now, when transaction fail, routine load task will retry as soon as
possible, when meet some temporarily unrecoverable errors like
`too_many_version`, it will retry too many times in a short time and
take huge pressure to upstream system like Kafka.
To solve this problem, we delay load task schedule when transaction fail
to reduce retry times when meet error and restore normal schedule if
transactions resume normal execution.
---
.../load/routineload/KafkaRoutineLoadJob.java | 3 +-
.../doris/load/routineload/RoutineLoadJob.java | 8 +-
.../load/routineload/RoutineLoadTaskInfo.java | 10 ++
.../load/routineload/RoutineLoadTaskScheduler.java | 4 +-
.../regression/util/RoutineLoadTestUtils.groovy | 152 +++++++++++++++++++++
.../test_routine_load_delay_schedule.groovy | 83 +++++++++++
6 files changed, 253 insertions(+), 7 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index c2441e972cc..b708f3eb07e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -356,11 +356,12 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
}
@Override
- protected RoutineLoadTaskInfo unprotectRenewTask(RoutineLoadTaskInfo
routineLoadTaskInfo) {
+ protected RoutineLoadTaskInfo unprotectRenewTask(RoutineLoadTaskInfo
routineLoadTaskInfo, boolean delaySchedule) {
KafkaTaskInfo oldKafkaTaskInfo = (KafkaTaskInfo) routineLoadTaskInfo;
// add new task
KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(oldKafkaTaskInfo,
((KafkaProgress)
progress).getPartitionIdToOffset(oldKafkaTaskInfo.getPartitions()),
isMultiTable());
+ kafkaTaskInfo.setDelaySchedule(delaySchedule);
// remove old task
routineLoadTaskInfoList.remove(routineLoadTaskInfo);
// add new task
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 4102d6ccbdb..d768da582a2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -807,7 +807,7 @@ public abstract class RoutineLoadJob
// and after renew, the previous task is removed from
routineLoadTaskInfoList,
// so task can no longer be committed successfully.
// the already committed task will not be handled here.
- RoutineLoadTaskInfo newTask =
unprotectRenewTask(routineLoadTaskInfo);
+ RoutineLoadTaskInfo newTask =
unprotectRenewTask(routineLoadTaskInfo, false);
Env.getCurrentEnv().getRoutineLoadTaskScheduler().addTaskInQueue(newTask);
}
}
@@ -987,7 +987,7 @@ public abstract class RoutineLoadJob
return 0L;
}
- abstract RoutineLoadTaskInfo unprotectRenewTask(RoutineLoadTaskInfo
routineLoadTaskInfo);
+ abstract RoutineLoadTaskInfo unprotectRenewTask(RoutineLoadTaskInfo
routineLoadTaskInfo, boolean delaySchedule);
// call before first scheduling
// derived class can override this.
@@ -1243,7 +1243,7 @@ public abstract class RoutineLoadJob
}
// create new task
- RoutineLoadTaskInfo newRoutineLoadTaskInfo =
unprotectRenewTask(routineLoadTaskInfo);
+ RoutineLoadTaskInfo newRoutineLoadTaskInfo =
unprotectRenewTask(routineLoadTaskInfo, false);
Env.getCurrentEnv().getRoutineLoadTaskScheduler().addTaskInQueue(newRoutineLoadTaskInfo);
} finally {
writeUnlock();
@@ -1395,7 +1395,7 @@ public abstract class RoutineLoadJob
if (state == JobState.RUNNING) {
if (txnStatus == TransactionStatus.ABORTED) {
- RoutineLoadTaskInfo newRoutineLoadTaskInfo =
unprotectRenewTask(routineLoadTaskInfo);
+ RoutineLoadTaskInfo newRoutineLoadTaskInfo =
unprotectRenewTask(routineLoadTaskInfo, true);
Env.getCurrentEnv().getRoutineLoadTaskScheduler().addTaskInQueue(newRoutineLoadTaskInfo);
} else if (txnStatus == TransactionStatus.COMMITTED) {
// this txn is just COMMITTED, create new task when the this
txn is VISIBLE
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
index 0c662ce765d..2708bbb6ebe 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
@@ -38,6 +38,8 @@ import
org.apache.doris.transaction.TransactionState.TxnSourceType;
import org.apache.doris.transaction.TransactionStatus;
import com.google.common.collect.Lists;
+import lombok.Getter;
+import lombok.Setter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -76,6 +78,10 @@ public abstract class RoutineLoadTaskInfo {
protected boolean isEof = false;
+ @Getter
+ @Setter
+ protected boolean delaySchedule = false;
+
// this status will be set when corresponding transaction's status is
changed.
// so that user or other logic can know the status of the corresponding
txn.
protected TransactionStatus txnStatus = TransactionStatus.UNKNOWN;
@@ -153,6 +159,10 @@ public abstract class RoutineLoadTaskInfo {
return isEof;
}
+ public boolean needDedalySchedule() {
+ return delaySchedule || isEof;
+ }
+
public boolean isTimeout() {
if (txnStatus == TransactionStatus.COMMITTED || txnStatus ==
TransactionStatus.VISIBLE) {
// the corresponding txn is already finished, this task can not be
treated as timeout.
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
index d1b5a6f73e8..7d576ce594b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
@@ -104,7 +104,7 @@ public class RoutineLoadTaskScheduler extends MasterDaemon {
RoutineLoadTaskInfo routineLoadTaskInfo =
needScheduleTasksQueue.take();
// try to delay scheduling tasks that are perceived as Eof to
MaxBatchInterval
// to avoid to much small transaction
- if (routineLoadTaskInfo.getIsEof()) {
+ if (routineLoadTaskInfo.needDedalySchedule()) {
RoutineLoadJob routineLoadJob =
routineLoadManager.getJob(routineLoadTaskInfo.getJobId());
if (System.currentTimeMillis() -
routineLoadTaskInfo.getLastScheduledTime()
< routineLoadJob.getMaxBatchIntervalS() * 1000) {
@@ -258,7 +258,7 @@ public class RoutineLoadTaskScheduler extends MasterDaemon {
}
// for other errors (network issues, BE restart, etc.), reschedule
immediately
- RoutineLoadTaskInfo newTask =
routineLoadJob.unprotectRenewTask(routineLoadTaskInfo);
+ RoutineLoadTaskInfo newTask =
routineLoadJob.unprotectRenewTask(routineLoadTaskInfo, false);
addTaskInQueue(newTask);
}
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/RoutineLoadTestUtils.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/RoutineLoadTestUtils.groovy
new file mode 100644
index 00000000000..9a5e27d2680
--- /dev/null
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/RoutineLoadTestUtils.groovy
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.regression.util
+
+import groovy.json.JsonSlurper
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.ProducerConfig
+import org.junit.Assert
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
+
+class RoutineLoadTestUtils {
+ private static final Logger logger =
LoggerFactory.getLogger(RoutineLoadTestUtils.class)
+
+ static boolean isKafkaTestEnabled(context) {
+ String enabled = context.config.otherConfigs.get("enableKafkaTest")
+ return enabled != null && enabled.equalsIgnoreCase("true")
+ }
+
+ static String getKafkaBroker(context) {
+ String kafka_port = context.config.otherConfigs.get("kafka_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ return "${externalEnvIp}:${kafka_port}"
+ }
+
+ static KafkaProducer createKafkaProducer(String kafkaBroker) {
+ def props = new Properties()
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker)
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
+ props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+ def producer = new KafkaProducer<>(props)
+ def verifyKafkaConnection = { prod ->
+ try {
+ logger.info("=====try to connect Kafka========")
+ def partitions =
prod.partitionsFor("__connection_verification_topic")
+ return partitions != null
+ } catch (Exception e) {
+ throw new Exception("Kafka connect fail:
${e.message}".toString())
+ }
+ }
+ try {
+ logger.info("Kafka connecting: ${kafkaBroker}")
+ if (!verifyKafkaConnection(producer)) {
+ throw new Exception("can't get any kafka info")
+ }
+ } catch (Exception e) {
+ logger.error("FATAL: " + e.getMessage())
+ producer.close()
+ throw e
+ }
+ logger.info("Kafka connect success")
+ return producer
+ }
+
+ static void sendTestDataToKafka(KafkaProducer producer, List<String>
topics, List<String> testData = null) {
+ if (testData == null) {
+ testData = [
+ "9,\\N,2023-07-15,def,2023-07-20T05:48:31,ghi",
+ "10,,2023-07-15,def,2023-07-20T05:48:31,ghi"
+ ]
+ }
+ for (String topic in topics) {
+ testData.each { line ->
+ logger.info("Sending data to kafka: ${line}")
+ def record = new ProducerRecord<>(topic, null, line)
+ producer.send(record)
+ }
+ }
+ }
+
+ static void checkTaskTimeout(Closure sqlRunner, String jobName, String
expectedTimeout, int maxAttempts = 60) {
+ def count = 0
+ while (true) {
+ def res = sqlRunner.call("SHOW ROUTINE LOAD TASK WHERE JobName =
'${jobName}'")
+ if (res.size() > 0) {
+ logger.info("res: ${res[0].toString()}")
+ logger.info("timeout: ${res[0][6].toString()}")
+ Assert.assertEquals(res[0][6].toString(), expectedTimeout)
+ break;
+ }
+ if (count > maxAttempts) {
+ Assert.assertEquals(1, 2)
+ break;
+ } else {
+ sleep(1000)
+ count++
+ }
+ }
+ }
+
+ static int waitForTaskFinish(Closure sqlRunner, String job, String
tableName, int expectedMinRows = 0, int maxAttempts = 60) {
+ def count = 0
+ while (true) {
+ def res = sqlRunner.call("show routine load for ${job}")
+ def routineLoadState = res[0][8].toString()
+ def statistic = res[0][14].toString()
+ logger.info("Routine load state: ${routineLoadState}")
+ logger.info("Routine load statistic: ${statistic}")
+ def rowCount = sqlRunner.call("select count(*) from ${tableName}")
+ if (routineLoadState == "RUNNING" && rowCount[0][0] >
expectedMinRows) {
+ break
+ }
+ if (count > maxAttempts) {
+ Assert.assertEquals(1, 2)
+ break;
+ } else {
+ sleep(1000)
+ count++
+ }
+ }
+ return count
+ }
+
+ static void waitForTaskAbort(Closure sqlRunner, String job, int
maxAttempts = 60) {
+ def count = 0
+ while (true) {
+ def res = sqlRunner.call("show routine load for ${job}")
+ def statistic = res[0][14].toString()
+ logger.info("Routine load statistic: ${statistic}")
+ def jsonSlurper = new JsonSlurper()
+ def json = jsonSlurper.parseText(res[0][14])
+ if (json.abortedTaskNum > 1) {
+ break
+ }
+ if (count > maxAttempts) {
+ Assert.assertEquals(1, 2)
+ break;
+ } else {
+ sleep(1000)
+ count++
+ }
+ }
+ }
+}
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_delay_schedule.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_delay_schedule.groovy
new file mode 100644
index 00000000000..b2e8dbef85b
--- /dev/null
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_delay_schedule.groovy
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for
+// the specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.util.RoutineLoadTestUtils
+import org.apache.kafka.clients.admin.AdminClient
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.ProducerConfig
+
+suite("test_routine_load_delay_schedule","nonConcurrent") {
+ def kafkaCsvTopics = [
+ "test_routine_load_delay_schedule",
+ ]
+
+ if (RoutineLoadTestUtils.isKafkaTestEnabled(context)) {
+ def runSql = { String q -> sql q }
+ def kafka_broker = RoutineLoadTestUtils.getKafkaBroker(context)
+ def producer = RoutineLoadTestUtils.createKafkaProducer(kafka_broker)
+
+ def tableName = "test_routine_load_delay_schedule"
+ def job = "test_delay_schedule"
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `k1` int(20) NULL,
+ `k2` string NULL,
+ `v1` date NULL,
+ `v2` string NULL,
+ `v3` datetime NULL,
+ `v4` string NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k1`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+
+ try {
+ sql """
+ CREATE ROUTINE LOAD ${job} ON ${tableName}
+ COLUMNS TERMINATED BY ","
+ PROPERTIES
+ (
+ "max_batch_interval" = "10"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${kafka_broker}",
+ "kafka_topic" = "${kafkaCsvTopics[0]}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+
+ def injection =
"RowsetBuilder.check_tablet_version_count.too_many_version"
+ try {
+ GetDebugPoint().enableDebugPointForAllBEs(injection)
+ RoutineLoadTestUtils.sendTestDataToKafka(producer,
kafkaCsvTopics)
+ RoutineLoadTestUtils.waitForTaskAbort(runSql, job)
+ } finally {
+ GetDebugPoint().disableDebugPointForAllBEs(injection)
+ }
+ def count = RoutineLoadTestUtils.waitForTaskFinish(runSql, job,
tableName, 0)
+ logger.info("wait count: " + count)
+ assertTrue(count > 5, "task should be delayed for scheduling")
+ } finally {
+ sql "stop routine load for ${job}"
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]