This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 81e705b30b7 branch-4.0: [Fix](job) add thread num config for streaming 
task exec #57230 (#57249)
81e705b30b7 is described below

commit 81e705b30b7deb71ff12cd08f335660c6bcd5fb8
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Oct 23 13:09:33 2025 +0800

    branch-4.0: [Fix](job) add thread num config for streaming task exec #57230 
(#57249)
    
    Cherry-picked from #57230
    
    Co-authored-by: wudi <[email protected]>
---
 fe/fe-common/src/main/java/org/apache/doris/common/Config.java    | 5 +++++
 .../org/apache/doris/job/scheduler/StreamingTaskScheduler.java    | 8 +++++---
 2 files changed, 10 insertions(+), 3 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 7f1080871b1..3b707240aab 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1970,6 +1970,11 @@ public class Config extends ConfigBase {
                     + " greater than 0, otherwise it defaults to 3." })
     public static int job_dictionary_task_consumer_thread_num = 3;
 
+    @ConfField(masterOnly = true, description = {"用于执行 Streaming 
任务的线程数,值应该大于0,否则默认为10",
+            "The number of threads used to execute Streaming Tasks, "
+                    + "the value should be greater than 0, if it is <=0, 
default is 10."})
+    public static int job_streaming_task_exec_thread_num = 10;
+
     @ConfField(masterOnly = true, description = {"最大的 Streaming 
作业数量,值应该大于0,否则默认为1024",
             "The maximum number of Streaming jobs, "
                     + "the value should be greater than 0, if it is <=0, 
default is 1024."})
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
index 91b6f475bfc..7e99ca3ada9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
@@ -41,9 +41,9 @@ import java.util.concurrent.TimeUnit;
 @Log4j2
 public class StreamingTaskScheduler extends MasterDaemon {
     private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
+                    Config.job_streaming_task_exec_thread_num,
+                    Config.job_streaming_task_exec_thread_num,
                     0,
-                    Config.max_streaming_job_num,
-                    60,
                     TimeUnit.SECONDS,
                     new ArrayBlockingQueue<>(Config.max_streaming_job_num),
                     new CustomThreadFactory("streaming-task-execute"),
@@ -120,9 +120,11 @@ public class StreamingTaskScheduler extends MasterDaemon {
         log.info("prepare to schedule task, task id: {}, job id: {}", 
task.getTaskId(), task.getJobId());
         job.setLastScheduleTaskTimestamp(System.currentTimeMillis());
         
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().addRunningTask(task);
-
+        long start = System.currentTimeMillis();
         try {
             task.execute();
+            log.info("Finished executing task, task id: {}, job id: {}, cost 
{}ms",
+                    task.getTaskId(), task.getJobId(), 
System.currentTimeMillis() - start);
         } catch (Exception e) {
             log.error("Failed to execute task, task id: {}, job id: {}", 
task.getTaskId(), task.getJobId(), e);
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to