This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch streaming-job-dev
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/streaming-job-dev by this push:
     new 434b2b3ca25 [fix](streaming job) register listener id when begin 
transaction (#56056)
434b2b3ca25 is described below

commit 434b2b3ca251306b16c0ab1ee2ea1bfa75b0913b
Author: hui lai <[email protected]>
AuthorDate: Mon Sep 15 17:40:22 2025 +0800

    [fix](streaming job) register listener id when begin transaction (#56056)
    
    ### What problem does this PR solve?
    
    Register listener id when begin transaction to ensure before/after
    commit logic would be executed.
---
 .../insert/streaming/StreamingInsertJob.java       |  4 +-
 .../org/apache/doris/job/manager/JobManager.java   |  2 +
 .../doris/job/manager/StreamingTaskManager.java    | 55 ++++++++++++++++++++++
 .../job/scheduler/StreamingTaskScheduler.java      | 12 ++---
 .../plans/commands/insert/OlapInsertExecutor.java  | 15 +++++-
 gensrc/proto/cloud.proto                           |  2 +-
 6 files changed, 79 insertions(+), 11 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index b18ffac380b..24df73cbbac 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -171,7 +171,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
     protected StreamingInsertTask createStreamingInsertTask() {
         this.runningStreamTask = new StreamingInsertTask(getJobId(), 
AbstractTask.getNextTaskId(), getExecuteSql(),
                 offsetProvider, getCurrentDbName(), jobProperties, 
getCreateUser());
-        
Env.getCurrentEnv().getJobManager().getStreamingTaskScheduler().registerTask(runningStreamTask);
+        
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().registerTask(runningStreamTask);
         this.runningStreamTask.setStatus(TaskStatus.PENDING);
         return runningStreamTask;
     }
@@ -213,6 +213,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
     }
 
     public void onStreamTaskFail(StreamingInsertTask task) throws JobException 
{
+        
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task);
         if (getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) {
             this.failMsg = new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, 
task.getErrMsg());
         }
@@ -220,6 +221,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
     }
 
     public void onStreamTaskSuccess(StreamingInsertTask task) {
+        
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task);
         StreamingInsertTask nextTask = createStreamingInsertTask();
         this.runningStreamTask = nextTask;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
index 9954d26b6f1..2b8c7225b2a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
@@ -74,6 +74,8 @@ public class JobManager<T extends AbstractJob<?, C>, C> 
implements Writable {
 
     private JobScheduler<T, C> jobScheduler;
 
+    @Getter
+    private final StreamingTaskManager streamingTaskManager = new 
StreamingTaskManager();
     @Getter
     private StreamingTaskScheduler streamingTaskScheduler;
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/manager/StreamingTaskManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/manager/StreamingTaskManager.java
new file mode 100644
index 00000000000..7c6fecee14a
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/manager/StreamingTaskManager.java
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.manager;
+
+import org.apache.doris.job.extensions.insert.streaming.StreamingInsertTask;
+
+import lombok.Getter;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingDeque;
+
+public class StreamingTaskManager {
+    @Getter
+    private final LinkedBlockingDeque<StreamingInsertTask> 
needScheduleTasksQueue = new LinkedBlockingDeque<>();
+    @Getter
+    private List<StreamingInsertTask> runningTasks = 
Collections.synchronizedList(new ArrayList<>());
+
+    public void registerTask(StreamingInsertTask task) {
+        needScheduleTasksQueue.add(task);
+    }
+
+    public StreamingInsertTask getStreamingInsertTaskById(long taskId) {
+        synchronized (runningTasks) {
+            return runningTasks.stream()
+                    .filter(task -> task.getTaskId() == taskId)
+                    .findFirst()
+                    .orElse(null);
+        }
+    }
+
+    public void addRunningTask(StreamingInsertTask task) {
+        runningTasks.add(task);
+    }
+
+    public void removeRunningTask(StreamingInsertTask task) {
+        runningTasks.remove(task);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
index 3562ed028d2..3fbae399303 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
@@ -37,7 +37,6 @@ import java.util.concurrent.TimeUnit;
 
 @Log4j2
 public class StreamingTaskScheduler extends MasterDaemon {
-    private final LinkedBlockingDeque<StreamingInsertTask> 
needScheduleTasksQueue = new LinkedBlockingDeque<>();
     private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
                     0,
                     Config.max_streaming_job_num,
@@ -63,12 +62,10 @@ public class StreamingTaskScheduler extends MasterDaemon {
         }
     }
 
-    public void registerTask(StreamingInsertTask task) {
-        needScheduleTasksQueue.add(task);
-    }
-
     private void process() throws InterruptedException {
         List<StreamingInsertTask> tasks = new ArrayList<>();
+        LinkedBlockingDeque<StreamingInsertTask> needScheduleTasksQueue =
+                
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().getNeedScheduleTasksQueue();
         tasks.add(needScheduleTasksQueue.take());
         needScheduleTasksQueue.drainTo(tasks);
         scheduleTasks(tasks);
@@ -106,13 +103,14 @@ public class StreamingTaskScheduler extends MasterDaemon {
             return;
         }
         log.info("prepare to schedule task, task id: {}, job id: {}", 
task.getTaskId(), task.getJobId());
-        task.execute();
         job.setLastScheduleTaskTimestamp(System.currentTimeMillis());
+        
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().addRunningTask(task);
+        task.execute();
     }
 
     private void scheduleTaskWithDelay(StreamingInsertTask task, long delayMs) 
{
         delayScheduler.schedule(() -> {
-            needScheduleTasksQueue.add(task);
+            
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().registerTask(task);
         }, delayMs, TimeUnit.MILLISECONDS);
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
index 47ef9cc3c2e..f41d8893f02 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
@@ -31,6 +31,7 @@ import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.DebugPointUtil;
 import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.job.extensions.insert.streaming.StreamingInsertTask;
 import org.apache.doris.load.EtlJobType;
 import org.apache.doris.nereids.NereidsPlanner;
 import org.apache.doris.nereids.exceptions.AnalysisException;
@@ -98,16 +99,26 @@ public class OlapInsertExecutor extends 
AbstractInsertExecutor {
                 throw new BeginTransactionException("current running txns on 
db is larger than limit");
             }
             this.txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
-                    database.getId(), ImmutableList.of(table.getId()), 
labelName,
+                    database.getId(), ImmutableList.of(table.getId()), 
labelName, null,
                     new TxnCoordinator(TxnSourceType.FE, 0,
                             FrontendOptions.getLocalHostAddress(),
                             ExecuteEnv.getInstance().getStartupTime()),
-                    LoadJobSourceType.INSERT_STREAMING, ctx.getExecTimeoutS());
+                    LoadJobSourceType.INSERT_STREAMING, getListenerId(), 
ctx.getExecTimeoutS());
         } catch (Exception e) {
             throw new AnalysisException("begin transaction failed. " + 
e.getMessage(), e);
         }
     }
 
+    private long getListenerId() {
+        long listenerId = -1;
+        StreamingInsertTask streamingInsertTask =
+                
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().getStreamingInsertTaskById(jobId);
+        if (streamingInsertTask != null) {
+            listenerId = streamingInsertTask.getJobId();
+        }
+        return listenerId;
+    }
+
     @Override
     public void finalizeSink(PlanFragment fragment, DataSink sink, 
PhysicalSink physicalSink) {
         OlapTableSink olapTableSink = (OlapTableSink) sink;
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 7c630f03353..aa6bcd28359 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -368,7 +368,7 @@ message RoutineLoadJobStatisticPB {
 
 message StreamingTaskCommitAttachmentPB {
     optional int64 job_id = 1;
-    optional UniqueIdPB task_id = 2;
+    optional int64 task_id = 2;
     optional string offset = 3;
     optional int64 scanned_rows = 4;
     optional int64 load_bytes = 5;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to