This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new ce11bb7345d branch-4.0: [enhance](job) terminate streaming task
execute threads promptly when idle #58041 (#58064)
ce11bb7345d is described below
commit ce11bb7345deec0cbd742260a4007abff3a1f47e
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Nov 18 12:10:49 2025 +0800
branch-4.0: [enhance](job) terminate streaming task execute threads
promptly when idle #58041 (#58064)
Cherry-picked from #58041
Co-authored-by: hui lai <[email protected]>
---
.../main/java/org/apache/doris/common/Config.java | 6 +++---
.../job/scheduler/StreamingTaskScheduler.java | 24 ++++++++++++++--------
2 files changed, 18 insertions(+), 12 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 250b272289e..9b188d3452b 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1985,10 +1985,10 @@ public class Config extends ConfigBase {
+ " greater than 0, otherwise it defaults to 3." })
public static int job_dictionary_task_consumer_thread_num = 3;
- @ConfField(masterOnly = true, description = {"用于执行 Streaming
任务的线程数,值应该大于0,否则默认为10",
+ @ConfField(masterOnly = true, description = {"用于执行 Streaming
任务的线程数,值应该大于0,否则默认为100",
"The number of threads used to execute Streaming Tasks, "
- + "the value should be greater than 0, if it is <=0,
default is 10."})
- public static int job_streaming_task_exec_thread_num = 10;
+ + "the value should be greater than 0, if it is <=0,
default is 100."})
+ public static int job_streaming_task_exec_thread_num = 100;
@ConfField(masterOnly = true, description = {"最大的 Streaming
作业数量,值应该大于0,否则默认为1024",
"The maximum number of Streaming jobs, "
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
index 51e17f214c6..ea1cf7825ec 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
@@ -40,15 +40,21 @@ import java.util.concurrent.TimeUnit;
@Log4j2
public class StreamingTaskScheduler extends MasterDaemon {
- private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
- Config.job_streaming_task_exec_thread_num,
- Config.job_streaming_task_exec_thread_num,
- 0,
- TimeUnit.SECONDS,
- new ArrayBlockingQueue<>(Config.max_streaming_job_num),
- new CustomThreadFactory("streaming-task-execute"),
- new ThreadPoolExecutor.AbortPolicy()
- );
+ private final ThreadPoolExecutor threadPool;
+
+ {
+ threadPool = new ThreadPoolExecutor(
+ Config.job_streaming_task_exec_thread_num,
+ Config.job_streaming_task_exec_thread_num,
+ 60L,
+ TimeUnit.SECONDS,
+ new ArrayBlockingQueue<>(Config.max_streaming_job_num),
+ new CustomThreadFactory("streaming-task-execute"),
+ new ThreadPoolExecutor.AbortPolicy()
+ );
+ threadPool.allowCoreThreadTimeOut(true);
+ }
+
private final ScheduledThreadPoolExecutor delayScheduler
= new ScheduledThreadPoolExecutor(1, new
CustomThreadFactory("streaming-task-delay-scheduler"));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]