This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 471f74704a3 branch-4.0: [enhance](job) support adaptive batch param
for routine load job (#58846)
471f74704a3 is described below
commit 471f74704a3fd5aaa494ddbece88b27090e26577
Author: hui lai <[email protected]>
AuthorDate: Wed Dec 17 18:03:19 2025 +0800
branch-4.0: [enhance](job) support adaptive batch param for routine load
job (#58846)
### What problem does this PR solve?
pick https://github.com/apache/doris/pull/56930 and
https://github.com/apache/doris/pull/57967
Users may set the max batch interval relatively small for visibility,
which may result in insufficient throughput and data backlog when
traffic is high. We propose an adaptive max batch interval scheme aimed
at prioritizing throughput over visibility during data backlog
### Release note
None
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [ ] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
.../main/java/org/apache/doris/common/Config.java | 6 ++
.../doris/load/routineload/KafkaTaskInfo.java | 26 ++++++-
.../load/routineload/RoutineLoadTaskInfo.java | 2 +-
.../doris/nereids/load/NereidsLoadTaskInfo.java | 3 +
.../nereids/load/NereidsRoutineLoadTaskInfo.java | 15 +++-
.../test_routine_load_adaptive_param.groovy | 91 ++++++++++++++++++++++
6 files changed, 136 insertions(+), 7 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index b34dde248c6..3ae4c1da5c2 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1266,6 +1266,12 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = true)
public static int routine_load_blacklist_expire_time_second = 300;
+ /**
+ * Minimum batch interval for adaptive routine load tasks when not at EOF.
+ */
+ @ConfField(mutable = true, masterOnly = true)
+ public static int routine_load_adaptive_min_batch_interval_sec = 360;
+
/**
* The max number of files store in SmallFileMgr
*/
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
index a3d87ccef8b..fafb34d960d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
@@ -24,6 +24,7 @@ import org.apache.doris.catalog.Table;
import org.apache.doris.common.Config;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.nereids.load.NereidsLoadTaskInfo;
import org.apache.doris.nereids.load.NereidsStreamLoadPlanner;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TFileFormatType;
@@ -100,6 +101,7 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
tRoutineLoadTask.setKafkaLoadInfo(tKafkaLoadInfo);
tRoutineLoadTask.setType(TLoadSourceType.KAFKA);
tRoutineLoadTask.setIsMultiTable(isMultiTable);
+ adaptiveBatchParam(tRoutineLoadTask, routineLoadJob);
if (!isMultiTable) {
Table tbl =
database.getTableOrMetaException(routineLoadJob.getTableId());
tRoutineLoadTask.setTbl(tbl.getName());
@@ -107,9 +109,6 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
} else {
Env.getCurrentEnv().getRoutineLoadManager().addMultiLoadTaskTxnIdToRoutineLoadJobId(txnId,
jobId);
}
-
tRoutineLoadTask.setMaxIntervalS(routineLoadJob.getMaxBatchIntervalS());
- tRoutineLoadTask.setMaxBatchRows(routineLoadJob.getMaxBatchRows());
-
tRoutineLoadTask.setMaxBatchSize(routineLoadJob.getMaxBatchSizeBytes());
if (!routineLoadJob.getFormat().isEmpty() &&
routineLoadJob.getFormat().equalsIgnoreCase("json")) {
tRoutineLoadTask.setFormat(TFileFormatType.FORMAT_JSON);
} else {
@@ -121,6 +120,23 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
return tRoutineLoadTask;
}
+ private void adaptiveBatchParam(TRoutineLoadTask tRoutineLoadTask,
RoutineLoadJob routineLoadJob) {
+ long maxBatchIntervalS = routineLoadJob.getMaxBatchIntervalS();
+ long maxBatchRows = routineLoadJob.getMaxBatchRows();
+ long maxBatchSize = routineLoadJob.getMaxBatchSizeBytes();
+ if (!isEof) {
+ maxBatchIntervalS = Math.max(maxBatchIntervalS,
Config.routine_load_adaptive_min_batch_interval_sec);
+ maxBatchRows = Math.max(maxBatchRows,
RoutineLoadJob.DEFAULT_MAX_BATCH_ROWS);
+ maxBatchSize = Math.max(maxBatchSize,
RoutineLoadJob.DEFAULT_MAX_BATCH_SIZE);
+ this.timeoutMs = maxBatchIntervalS *
Config.routine_load_task_timeout_multiplier * 1000;
+ } else {
+ this.timeoutMs = routineLoadJob.getTimeout() * 1000;
+ }
+ tRoutineLoadTask.setMaxIntervalS(maxBatchIntervalS);
+ tRoutineLoadTask.setMaxBatchRows(maxBatchRows);
+ tRoutineLoadTask.setMaxBatchSize(maxBatchSize);
+ }
+
@Override
protected String getTaskDataSourceProperties() {
Gson gson = new Gson();
@@ -137,9 +153,11 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
TUniqueId loadId = new TUniqueId(id.getMostSignificantBits(),
id.getLeastSignificantBits());
// plan for each task, in case table has change(rollup or schema
change)
Database db =
Env.getCurrentInternalCatalog().getDbOrMetaException(routineLoadJob.getDbId());
+ NereidsLoadTaskInfo taskInfo =
routineLoadJob.toNereidsRoutineLoadTaskInfo();
+ taskInfo.setTimeout((int) (this.timeoutMs / 1000));
NereidsStreamLoadPlanner planner = new NereidsStreamLoadPlanner(db,
(OlapTable)
db.getTableOrMetaException(routineLoadJob.getTableId(),
- Table.TableType.OLAP),
routineLoadJob.toNereidsRoutineLoadTaskInfo());
+ Table.TableType.OLAP), taskInfo);
TPipelineFragmentParams tExecPlanFragmentParams =
routineLoadJob.plan(planner, loadId, txnId);
TPlanFragment tPlanFragment = tExecPlanFragmentParams.getFragment();
tPlanFragment.getOutputSink().getOlapTableSink().setTxnId(txnId);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
index 2708bbb6ebe..aa8c2ccc63c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
@@ -189,7 +189,7 @@ public abstract class RoutineLoadTaskInfo {
RoutineLoadJob routineLoadJob = routineLoadManager.getJob(jobId);
if (rlTaskTxnCommitAttachment.getTotalRows() <
routineLoadJob.getMaxBatchRows()
&& rlTaskTxnCommitAttachment.getReceivedBytes() <
routineLoadJob.getMaxBatchSizeBytes()
- && rlTaskTxnCommitAttachment.getTaskExecutionTimeMs() <
this.timeoutMs) {
+ && rlTaskTxnCommitAttachment.getTaskExecutionTimeMs() <
routineLoadJob.getMaxBatchIntervalS() * 1000) {
this.isEof = true;
} else {
this.isEof = false;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadTaskInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadTaskInfo.java
index 0d15e3d9086..efebf0018c9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadTaskInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadTaskInfo.java
@@ -42,6 +42,9 @@ public interface NereidsLoadTaskInfo {
int getTimeout();
+ default void setTimeout(int timeout) {
+ }
+
long getMemLimit();
String getTimezone();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsRoutineLoadTaskInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsRoutineLoadTaskInfo.java
index 8fac34ecf28..381912882d4 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsRoutineLoadTaskInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsRoutineLoadTaskInfo.java
@@ -67,6 +67,7 @@ public class NereidsRoutineLoadTaskInfo implements
NereidsLoadTaskInfo {
protected boolean loadToSingleTablet;
protected boolean isPartialUpdate;
protected boolean memtableOnSinkNode;
+ protected int timeoutSec;
/**
* NereidsRoutineLoadTaskInfo
@@ -96,6 +97,7 @@ public class NereidsRoutineLoadTaskInfo implements
NereidsLoadTaskInfo {
this.loadToSingleTablet = loadToSingleTablet;
this.isPartialUpdate = isPartialUpdate;
this.memtableOnSinkNode = memtableOnSinkNode;
+ this.timeoutSec = calTimeoutSec();
}
@Override
@@ -108,14 +110,23 @@ public class NereidsRoutineLoadTaskInfo implements
NereidsLoadTaskInfo {
return -1L;
}
- @Override
- public int getTimeout() {
+ public int calTimeoutSec() {
int timeoutSec = (int) maxBatchIntervalS *
Config.routine_load_task_timeout_multiplier;
int realTimeoutSec = timeoutSec <
Config.routine_load_task_min_timeout_sec
? Config.routine_load_task_min_timeout_sec : timeoutSec;
return realTimeoutSec;
}
+ @Override
+ public int getTimeout() {
+ return this.timeoutSec;
+ }
+
+ @Override
+ public void setTimeout(int timeout) {
+ this.timeoutSec = timeout;
+ }
+
@Override
public long getMemLimit() {
return execMemLimit;
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_adaptive_param.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_adaptive_param.groovy
new file mode 100644
index 00000000000..31ab6278df1
--- /dev/null
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_adaptive_param.groovy
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.util.RoutineLoadTestUtils
+import org.apache.kafka.clients.admin.AdminClient
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.ProducerConfig
+
+suite("test_routine_load_adaptive_param","nonConcurrent") {
+ def kafkaCsvTpoics = [
+ "test_routine_load_adaptive_param",
+ ]
+
+ if (RoutineLoadTestUtils.isKafkaTestEnabled(context)) {
+ def runSql = { String q -> sql q }
+ def kafka_broker = RoutineLoadTestUtils.getKafkaBroker(context)
+ def producer = RoutineLoadTestUtils.createKafkaProducer(kafka_broker)
+
+ def tableName = "test_routine_load_adaptive_param"
+ def job = "test_adaptive_param"
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `k1` int(20) NULL,
+ `k2` string NULL,
+ `v1` date NULL,
+ `v2` string NULL,
+ `v3` datetime NULL,
+ `v4` string NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k1`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+
+ try {
+ sql """
+ CREATE ROUTINE LOAD ${job} ON ${tableName}
+ COLUMNS TERMINATED BY ","
+ PROPERTIES
+ (
+ "max_batch_interval" = "10"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${kafka_broker}",
+ "kafka_topic" = "${kafkaCsvTpoics[0]}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+
+ def injection = "RoutineLoadTaskInfo.judgeEof"
+ try {
+ GetDebugPoint().enableDebugPointForAllFEs(injection)
+ RoutineLoadTestUtils.sendTestDataToKafka(producer,
kafkaCsvTpoics)
+ RoutineLoadTestUtils.waitForTaskFinish(runSql, job, tableName,
0)
+ } finally {
+ GetDebugPoint().disableDebugPointForAllFEs(injection)
+ }
+ // test adaptively increase
+ logger.info("---test adaptively increase---")
+ RoutineLoadTestUtils.sendTestDataToKafka(producer, kafkaCsvTpoics)
+ RoutineLoadTestUtils.checkTaskTimeout(runSql, job, "3600")
+ RoutineLoadTestUtils.waitForTaskFinish(runSql, job, tableName, 2)
+
+ // test restore adaptively
+ logger.info("---test restore adaptively---")
+ RoutineLoadTestUtils.sendTestDataToKafka(producer, kafkaCsvTpoics)
+ RoutineLoadTestUtils.waitForTaskFinish(runSql, job, tableName, 4)
+ RoutineLoadTestUtils.checkTaskTimeout(runSql, job, "100")
+ } finally {
+ sql "stop routine load for ${job}"
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]