This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch streaming-job-dev
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/streaming-job-dev by this push:
     new 47f2e1e7486 [feat](streaming job) introduce streaming task scheduler 
(#55857)
47f2e1e7486 is described below

commit 47f2e1e7486552334c570f20069a706ac774544a
Author: hui lai <[email protected]>
AuthorDate: Wed Sep 10 16:16:54 2025 +0800

    [feat](streaming job) introduce streaming task scheduler (#55857)
    
    ### What problem does this PR solve?
    
     Introduce streaming task scheduler to schedule all streaming tasks.
---
 .../main/java/org/apache/doris/common/Config.java  |   5 +
 .../insert/streaming/StreamingInsertJob.java       |  20 +++-
 .../insert/streaming/StreamingInsertTask.java      |  36 +++++++
 .../streaming/StreamingJobSchedulerTask.java       |   2 +-
 .../org/apache/doris/job/manager/JobManager.java   |   6 +-
 .../job/scheduler/StreamingTaskScheduler.java      | 110 +++++++++++++++++++++
 6 files changed, 173 insertions(+), 6 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index e2c5b9eb8ae..7770318a5e9 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1955,6 +1955,11 @@ public class Config extends ConfigBase {
                     + " greater than 0, otherwise it defaults to 3." })
     public static int job_dictionary_task_consumer_thread_num = 3;
 
+    @ConfField(masterOnly = true, description = {"最大的 Streaming 
作业数量,值应该大于0,否则默认为1024",
+            "The maximum number of Streaming jobs, "
+                    + "the value should be greater than 0, if it is <=0, 
default is 1024."})
+    public static int max_streaming_job_num = 1024;
+
     /* job test config */
     /**
      * If set to true, we will allow the interval unit to be set to second, 
when creating a recurring job.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index a2016c52cb9..a4bbf51b8a4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -42,9 +42,6 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
 
     @SerializedName("did")
     private final long dbId;
-    @Getter
-    @SerializedName("st")
-    protected JobStatus status;
 
     @Getter
     protected PauseReason pauseReason;
@@ -61,6 +58,8 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
     @SerializedName("jp")
     private StreamingJobProperties jobProperties;
 
+    private long lastScheduleTaskTimestamp = -1L;
+
     public StreamingInsertJob(String jobName,
             JobStatus jobStatus,
             String dbName,
@@ -76,7 +75,6 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         this.jobProperties = jobProperties;
     }
 
-
     @Override
     public void updateJobStatus(JobStatus status) throws JobException {
         super.updateJobStatus(status);
@@ -128,4 +126,18 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
     public void write(DataOutput out) throws IOException {
         Text.writeString(out, GsonUtils.GSON.toJson(this));
     }
+
+    public boolean needScheduleTask() {
+        return (getJobStatus().equals(JobStatus.RUNNING) || 
getJobStatus().equals(JobStatus.PENDING));
+    }
+
+    // When consumer to EOF, delay schedule task appropriately can avoid too 
many small transactions.
+    public boolean needDelayScheduleTask() {
+        return System.currentTimeMillis() - lastScheduleTaskTimestamp > 
jobProperties.getMaxIntervalSecond() * 1000;
+    }
+
+    public boolean hasMoreDataToConsume() {
+        // TODO: implement this
+        return true;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
new file mode 100644
index 00000000000..4e07ee01e30
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.extensions.insert.streaming;
+
+import lombok.Getter;
+
+public class StreamingInsertTask {
+    @Getter
+    private long jobId;
+
+    @Getter
+    private long taskId;
+
+    public StreamingInsertTask(long jobId, long taskId) {
+        this.jobId = jobId;
+        this.taskId = taskId;
+    }
+
+    public void execute() {
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
index 51ab96b3b17..888669f428a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
@@ -37,7 +37,7 @@ public class StreamingJobSchedulerTask extends AbstractTask {
 
     @Override
     public void run() throws JobException {
-        switch (streamingInsertJob.getStatus()) {
+        switch (streamingInsertJob.getJobStatus()) {
             case PENDING:
                 streamingInsertJob.createStreamingInsertTask();
                 streamingInsertJob.updateJobStatus(JobStatus.RUNNING);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
index e763f8de590..ae6bad07066 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
@@ -40,6 +40,7 @@ import org.apache.doris.job.exception.JobException;
 import org.apache.doris.job.extensions.insert.InsertJob;
 import org.apache.doris.job.extensions.mtmv.MTMVJob;
 import org.apache.doris.job.scheduler.JobScheduler;
+import org.apache.doris.job.scheduler.StreamingTaskScheduler;
 import org.apache.doris.load.loadv2.JobState;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.nereids.trees.expressions.And;
@@ -72,6 +73,8 @@ public class JobManager<T extends AbstractJob<?, C>, C> 
implements Writable {
 
     private JobScheduler<T, C> jobScheduler;
 
+    private StreamingTaskScheduler streamingTaskScheduler;
+
     // lock for job
     // lock is private and must use after db lock
     private final ReentrantReadWriteLock lock = new 
ReentrantReadWriteLock(true);
@@ -95,9 +98,10 @@ public class JobManager<T extends AbstractJob<?, C>, C> 
implements Writable {
     public void start() {
         jobScheduler = new JobScheduler<T, C>(jobMap);
         jobScheduler.start();
+        streamingTaskScheduler = new StreamingTaskScheduler();
+        streamingTaskScheduler.start();
     }
 
-
     /**
      * get running job
      *
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
new file mode 100644
index 00000000000..d87a2b7833d
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.scheduler;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.CustomThreadFactory;
+import org.apache.doris.common.util.MasterDaemon;
+import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
+import org.apache.doris.job.extensions.insert.streaming.StreamingInsertTask;
+
+import lombok.extern.log4j.Log4j2;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+@Log4j2
+public class StreamingTaskScheduler extends MasterDaemon {
+    private final LinkedBlockingDeque<StreamingInsertTask> 
needScheduleTasksQueue = new LinkedBlockingDeque<>();
+    private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
+                    0,
+                    Config.max_streaming_job_num,
+                    60,
+                    TimeUnit.SECONDS,
+                    new ArrayBlockingQueue<>(Config.max_streaming_job_num),
+                    new CustomThreadFactory("streaming-task-execute"),
+                    new ThreadPoolExecutor.AbortPolicy()
+            );
+    private final ScheduledThreadPoolExecutor delayScheduler
+                = new ScheduledThreadPoolExecutor(1, new 
CustomThreadFactory("streaming-task-delay-scheduler"));
+
+    public StreamingTaskScheduler() {
+        super("Streaming-task-scheduler", 1);
+    }
+
+    @Override
+    protected void runAfterCatalogReady() {
+        try {
+            process();
+        } catch (Throwable e) {
+            log.warn("Failed to process one round of StreamingTaskScheduler", 
e);
+        }
+    }
+
+    public void registerTask(StreamingInsertTask task) {
+        needScheduleTasksQueue.add(task);
+    }
+
+    private void process() throws InterruptedException {
+        List<StreamingInsertTask> tasks = new ArrayList<>();
+        tasks.add(needScheduleTasksQueue.take());
+        needScheduleTasksQueue.drainTo(tasks);
+        scheduleTasks(tasks);
+    }
+
+    private void scheduleTasks(List<StreamingInsertTask> tasks) {
+        for (StreamingInsertTask task : tasks) {
+            threadPool.execute(() -> scheduleOneTask(task));
+        }
+    }
+
+    private void scheduleOneTask(StreamingInsertTask task) {
+        StreamingInsertJob job = (StreamingInsertJob) 
Env.getCurrentEnv().getJobManager().getJob(task.getJobId());
+        if (job == null) {
+            log.warn("Job not found, job id: {}", task.getJobId());
+            return;
+        }
+        if (!job.needScheduleTask()) {
+            log.info("do not need to schedule invalid task, task id: {}, job 
id: {}",
+                        task.getTaskId(), task.getJobId());
+            return;
+        }
+        if (job.hasMoreDataToConsume()) {
+            scheduleTaskWithDelay(task, 500);
+            return;
+        }
+        if (job.needDelayScheduleTask()) {
+            scheduleTaskWithDelay(task, 500);
+            return;
+        }
+        log.info("prepare to schedule task, task id: {}, job id: {}", 
task.getTaskId(), task.getJobId());
+        task.execute();
+    }
+
+    private void scheduleTaskWithDelay(StreamingInsertTask task, long delayMs) 
{
+        delayScheduler.schedule(() -> {
+            needScheduleTasksQueue.add(task);
+        }, delayMs, TimeUnit.MILLISECONDS);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to