This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 81e705b30b7 branch-4.0: [Fix](job) add thread num config for streaming
task exec #57230 (#57249)
81e705b30b7 is described below
commit 81e705b30b7deb71ff12cd08f335660c6bcd5fb8
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Oct 23 13:09:33 2025 +0800
branch-4.0: [Fix](job) add thread num config for streaming task exec #57230
(#57249)
Cherry-picked from #57230
Co-authored-by: wudi <[email protected]>
---
fe/fe-common/src/main/java/org/apache/doris/common/Config.java | 5 +++++
.../org/apache/doris/job/scheduler/StreamingTaskScheduler.java | 8 +++++---
2 files changed, 10 insertions(+), 3 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 7f1080871b1..3b707240aab 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1970,6 +1970,11 @@ public class Config extends ConfigBase {
+ " greater than 0, otherwise it defaults to 3." })
public static int job_dictionary_task_consumer_thread_num = 3;
+ @ConfField(masterOnly = true, description = {"用于执行 Streaming
任务的线程数,值应该大于0,否则默认为10",
+ "The number of threads used to execute Streaming Tasks, "
+ + "the value should be greater than 0, if it is <=0,
default is 10."})
+ public static int job_streaming_task_exec_thread_num = 10;
+
@ConfField(masterOnly = true, description = {"最大的 Streaming
作业数量,值应该大于0,否则默认为1024",
"The maximum number of Streaming jobs, "
+ "the value should be greater than 0, if it is <=0,
default is 1024."})
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
index 91b6f475bfc..7e99ca3ada9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
@@ -41,9 +41,9 @@ import java.util.concurrent.TimeUnit;
@Log4j2
public class StreamingTaskScheduler extends MasterDaemon {
private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
+ Config.job_streaming_task_exec_thread_num,
+ Config.job_streaming_task_exec_thread_num,
0,
- Config.max_streaming_job_num,
- 60,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(Config.max_streaming_job_num),
new CustomThreadFactory("streaming-task-execute"),
@@ -120,9 +120,11 @@ public class StreamingTaskScheduler extends MasterDaemon {
log.info("prepare to schedule task, task id: {}, job id: {}",
task.getTaskId(), task.getJobId());
job.setLastScheduleTaskTimestamp(System.currentTimeMillis());
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().addRunningTask(task);
-
+ long start = System.currentTimeMillis();
try {
task.execute();
+ log.info("Finished executing task, task id: {}, job id: {}, cost
{}ms",
+ task.getTaskId(), task.getJobId(),
System.currentTimeMillis() - start);
} catch (Exception e) {
log.error("Failed to execute task, task id: {}, job id: {}",
task.getTaskId(), task.getJobId(), e);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]