This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch streaming-job-dev
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/streaming-job-dev by this push:
new 434b2b3ca25 [fix](streaming job) register listener id when begin
transaction (#56056)
434b2b3ca25 is described below
commit 434b2b3ca251306b16c0ab1ee2ea1bfa75b0913b
Author: hui lai <[email protected]>
AuthorDate: Mon Sep 15 17:40:22 2025 +0800
[fix](streaming job) register listener id when begin transaction (#56056)
### What problem does this PR solve?
Register listener id when begin transaction to ensure before/after
commit logic would be executed.
---
.../insert/streaming/StreamingInsertJob.java | 4 +-
.../org/apache/doris/job/manager/JobManager.java | 2 +
.../doris/job/manager/StreamingTaskManager.java | 55 ++++++++++++++++++++++
.../job/scheduler/StreamingTaskScheduler.java | 12 ++---
.../plans/commands/insert/OlapInsertExecutor.java | 15 +++++-
gensrc/proto/cloud.proto | 2 +-
6 files changed, 79 insertions(+), 11 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index b18ffac380b..24df73cbbac 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -171,7 +171,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
protected StreamingInsertTask createStreamingInsertTask() {
this.runningStreamTask = new StreamingInsertTask(getJobId(),
AbstractTask.getNextTaskId(), getExecuteSql(),
offsetProvider, getCurrentDbName(), jobProperties,
getCreateUser());
-
Env.getCurrentEnv().getJobManager().getStreamingTaskScheduler().registerTask(runningStreamTask);
+
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().registerTask(runningStreamTask);
this.runningStreamTask.setStatus(TaskStatus.PENDING);
return runningStreamTask;
}
@@ -213,6 +213,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
}
public void onStreamTaskFail(StreamingInsertTask task) throws JobException
{
+
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task);
if (getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) {
this.failMsg = new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL,
task.getErrMsg());
}
@@ -220,6 +221,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
}
public void onStreamTaskSuccess(StreamingInsertTask task) {
+
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task);
StreamingInsertTask nextTask = createStreamingInsertTask();
this.runningStreamTask = nextTask;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
index 9954d26b6f1..2b8c7225b2a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
@@ -74,6 +74,8 @@ public class JobManager<T extends AbstractJob<?, C>, C>
implements Writable {
private JobScheduler<T, C> jobScheduler;
+ @Getter
+ private final StreamingTaskManager streamingTaskManager = new
StreamingTaskManager();
@Getter
private StreamingTaskScheduler streamingTaskScheduler;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/manager/StreamingTaskManager.java
b/fe/fe-core/src/main/java/org/apache/doris/job/manager/StreamingTaskManager.java
new file mode 100644
index 00000000000..7c6fecee14a
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/manager/StreamingTaskManager.java
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.manager;
+
+import org.apache.doris.job.extensions.insert.streaming.StreamingInsertTask;
+
+import lombok.Getter;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingDeque;
+
+public class StreamingTaskManager {
+ @Getter
+ private final LinkedBlockingDeque<StreamingInsertTask>
needScheduleTasksQueue = new LinkedBlockingDeque<>();
+ @Getter
+ private List<StreamingInsertTask> runningTasks =
Collections.synchronizedList(new ArrayList<>());
+
+ public void registerTask(StreamingInsertTask task) {
+ needScheduleTasksQueue.add(task);
+ }
+
+ public StreamingInsertTask getStreamingInsertTaskById(long taskId) {
+ synchronized (runningTasks) {
+ return runningTasks.stream()
+ .filter(task -> task.getTaskId() == taskId)
+ .findFirst()
+ .orElse(null);
+ }
+ }
+
+ public void addRunningTask(StreamingInsertTask task) {
+ runningTasks.add(task);
+ }
+
+ public void removeRunningTask(StreamingInsertTask task) {
+ runningTasks.remove(task);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
index 3562ed028d2..3fbae399303 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
@@ -37,7 +37,6 @@ import java.util.concurrent.TimeUnit;
@Log4j2
public class StreamingTaskScheduler extends MasterDaemon {
- private final LinkedBlockingDeque<StreamingInsertTask>
needScheduleTasksQueue = new LinkedBlockingDeque<>();
private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
0,
Config.max_streaming_job_num,
@@ -63,12 +62,10 @@ public class StreamingTaskScheduler extends MasterDaemon {
}
}
- public void registerTask(StreamingInsertTask task) {
- needScheduleTasksQueue.add(task);
- }
-
private void process() throws InterruptedException {
List<StreamingInsertTask> tasks = new ArrayList<>();
+ LinkedBlockingDeque<StreamingInsertTask> needScheduleTasksQueue =
+
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().getNeedScheduleTasksQueue();
tasks.add(needScheduleTasksQueue.take());
needScheduleTasksQueue.drainTo(tasks);
scheduleTasks(tasks);
@@ -106,13 +103,14 @@ public class StreamingTaskScheduler extends MasterDaemon {
return;
}
log.info("prepare to schedule task, task id: {}, job id: {}",
task.getTaskId(), task.getJobId());
- task.execute();
job.setLastScheduleTaskTimestamp(System.currentTimeMillis());
+
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().addRunningTask(task);
+ task.execute();
}
private void scheduleTaskWithDelay(StreamingInsertTask task, long delayMs)
{
delayScheduler.schedule(() -> {
- needScheduleTasksQueue.add(task);
+
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().registerTask(task);
}, delayMs, TimeUnit.MILLISECONDS);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
index 47ef9cc3c2e..f41d8893f02 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
@@ -31,6 +31,7 @@ import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.job.extensions.insert.streaming.StreamingInsertTask;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.exceptions.AnalysisException;
@@ -98,16 +99,26 @@ public class OlapInsertExecutor extends
AbstractInsertExecutor {
throw new BeginTransactionException("current running txns on
db is larger than limit");
}
this.txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
- database.getId(), ImmutableList.of(table.getId()),
labelName,
+ database.getId(), ImmutableList.of(table.getId()),
labelName, null,
new TxnCoordinator(TxnSourceType.FE, 0,
FrontendOptions.getLocalHostAddress(),
ExecuteEnv.getInstance().getStartupTime()),
- LoadJobSourceType.INSERT_STREAMING, ctx.getExecTimeoutS());
+ LoadJobSourceType.INSERT_STREAMING, getListenerId(),
ctx.getExecTimeoutS());
} catch (Exception e) {
throw new AnalysisException("begin transaction failed. " +
e.getMessage(), e);
}
}
+ private long getListenerId() {
+ long listenerId = -1;
+ StreamingInsertTask streamingInsertTask =
+
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().getStreamingInsertTaskById(jobId);
+ if (streamingInsertTask != null) {
+ listenerId = streamingInsertTask.getJobId();
+ }
+ return listenerId;
+ }
+
@Override
public void finalizeSink(PlanFragment fragment, DataSink sink,
PhysicalSink physicalSink) {
OlapTableSink olapTableSink = (OlapTableSink) sink;
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 7c630f03353..aa6bcd28359 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -368,7 +368,7 @@ message RoutineLoadJobStatisticPB {
message StreamingTaskCommitAttachmentPB {
optional int64 job_id = 1;
- optional UniqueIdPB task_id = 2;
+ optional int64 task_id = 2;
optional string offset = 3;
optional int64 scanned_rows = 4;
optional int64 load_bytes = 5;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]