This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new c210d328dc2 branch-4.0: [Improve](job) add history task info in task 
tvf show #57361 (#57453)
c210d328dc2 is described below

commit c210d328dc2a1b92414c3f382038ead6b542dbef
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Oct 30 10:49:06 2025 +0800

    branch-4.0: [Improve](job) add history task info in task tvf show #57361 
(#57453)
    
    Cherry-picked from #57361
    
    Co-authored-by: wudi <[email protected]>
---
 .../main/java/org/apache/doris/common/Config.java  |  5 ++
 .../doris/job/extensions/insert/InsertJob.java     |  2 +
 .../doris/job/extensions/insert/InsertTask.java    |  4 +-
 .../insert/streaming/StreamingInsertJob.java       | 52 +++++++++++++++
 .../insert/streaming/StreamingInsertTask.java      | 73 ++++++++++++++++++---
 .../insert/streaming/StreamingJobProperties.java   |  1 +
 .../streaming/StreamingJobSchedulerTask.java       | 75 +---------------------
 .../org/apache/doris/job/offset/s3/S3Offset.java   |  2 +-
 .../job/scheduler/StreamingTaskScheduler.java      | 11 +++-
 .../doris/tablefunction/MetadataGenerator.java     | 23 +++++--
 .../streaming_job/test_streaming_insert_job.groovy | 17 +++--
 .../test_streaming_insert_job_offset.groovy        |  6 +-
 12 files changed, 169 insertions(+), 102 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 9597815fcc4..1b386f98588 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1980,6 +1980,11 @@ public class Config extends ConfigBase {
                     + "the value should be greater than 0, if it is <=0, 
default is 1024."})
     public static int max_streaming_job_num = 1024;
 
+    @ConfField(masterOnly = true, description = {"一个 Streaming Job 在内存中最多保留的 
task的数量,超过将丢弃旧的记录",
+            "The maximum number of tasks a Streaming Job can keep in memory. 
If the number exceeds the limit, "
+                    + "old records will be discarded."})
+    public static int max_streaming_task_show_count = 100;
+
     /* job test config */
     /**
      * If set to true, we will allow the interval unit to be set to second, 
when creating a recurring job.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
index 305089378a0..90a5f0dd895 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
@@ -104,6 +104,7 @@ public class InsertJob extends AbstractJob<InsertTask, 
Map<Object, Object>> impl
             .add(new Column("EndOffset", ScalarType.createStringType()))
             .add(new Column("LoadStatistic", ScalarType.createStringType()))
             .add(new Column("ErrorMsg", ScalarType.createStringType()))
+            .add(new Column("JobRuntimeMsg", ScalarType.createStringType()))
             .build();
 
     public static final ShowResultSetMetaData TASK_META_DATA =
@@ -567,6 +568,7 @@ public class InsertJob extends AbstractJob<InsertTask, 
Map<Object, Object>> impl
         trow.addToColumnValue(new TCell().setStringVal(
                 loadStatistic == null ? FeConstants.null_string : 
loadStatistic.toJson()));
         trow.addToColumnValue(new TCell().setStringVal(failMsg == null ? 
FeConstants.null_string : failMsg.getMsg()));
+        trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
         return trow;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
index aa1ecc02a9f..69ce9c309be 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
@@ -68,7 +68,8 @@ public class InsertTask extends AbstractTask {
             new Column("TrackingUrl", ScalarType.createStringType()),
             new Column("LoadStatistic", ScalarType.createStringType()),
             new Column("User", ScalarType.createStringType()),
-            new Column("FirstErrorMsg", ScalarType.createStringType()));
+            new Column("FirstErrorMsg", ScalarType.createStringType()),
+            new Column("RunningOffset", ScalarType.createStringType()));
 
     public static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;
 
@@ -297,6 +298,7 @@ public class InsertTask extends AbstractTask {
         trow.addToColumnValue(new TCell().setStringVal(""));
         trow.addToColumnValue(new 
TCell().setStringVal(userIdentity.getQualifiedUser()));
         trow.addToColumnValue(new TCell().setStringVal(""));
+        trow.addToColumnValue(new TCell().setStringVal(""));
         return trow;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index c1c9a77b1c1..b4accf1b15e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -80,9 +80,11 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -117,6 +119,10 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
     private long lastScheduleTaskTimestamp = -1L;
     private InsertIntoTableCommand baseCommand;
     private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
+    private ConcurrentLinkedQueue<StreamingInsertTask> streamInsertTaskQueue = 
new ConcurrentLinkedQueue<>();
+    @Setter
+    @Getter
+    private String jobRuntimeMsg = "";
 
     public StreamingInsertJob(String jobName,
             JobStatus jobStatus,
@@ -256,6 +262,20 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         }
     }
 
+    @Override
+    public void cancelAllTasks(boolean needWaitCancelComplete) throws 
JobException {
+        lock.writeLock().lock();
+        try {
+            if (runningStreamTask == null) {
+                return;
+            }
+            runningStreamTask.cancel(needWaitCancelComplete);
+            canceledTaskCount.incrementAndGet();
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
     @Override
     public JobType getJobType() {
         return JobType.INSERT;
@@ -298,9 +318,35 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         this.runningStreamTask.setStatus(TaskStatus.PENDING);
         log.info("create new streaming insert task for job {}, task {} ",
                 getJobId(), runningStreamTask.getTaskId());
+        recordTasks(runningStreamTask);
         return runningStreamTask;
     }
 
+    public void recordTasks(StreamingInsertTask task) {
+        if (Config.max_streaming_task_show_count < 1) {
+            return;
+        }
+        streamInsertTaskQueue.add(task);
+
+        while (streamInsertTaskQueue.size() > 
Config.max_streaming_task_show_count) {
+            streamInsertTaskQueue.poll();
+        }
+    }
+
+    /**
+     * for show command to display all streaming insert tasks of this job.
+     */
+    public List<StreamingInsertTask> queryAllStreamTasks() {
+        if (CollectionUtils.isEmpty(streamInsertTaskQueue)) {
+            return new ArrayList<>();
+        }
+        List<StreamingInsertTask> tasks = new 
ArrayList<>(streamInsertTaskQueue);
+        Comparator<StreamingInsertTask> taskComparator =
+                
Comparator.comparingLong(StreamingInsertTask::getCreateTimeMs).reversed();
+        tasks.sort(taskComparator);
+        return tasks;
+    }
+
     protected void fetchMeta() {
         try {
             if (originTvfProps == null) {
@@ -480,6 +526,8 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
                 jobStatistic == null ? FeConstants.null_string : 
jobStatistic.toJson()));
         trow.addToColumnValue(new TCell().setStringVal(failureReason == null
                 ? FeConstants.null_string : failureReason.getMsg()));
+        trow.addToColumnValue(new TCell().setStringVal(jobRuntimeMsg == null
+                ? FeConstants.null_string : jobRuntimeMsg));
         return trow;
     }
 
@@ -703,6 +751,10 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
             setCanceledTaskCount(new AtomicLong(0));
         }
 
+        if (null == streamInsertTaskQueue) {
+            streamInsertTaskQueue = new ConcurrentLinkedQueue<>();
+        }
+
         if (null == lock) {
             this.lock = new ReentrantReadWriteLock(true);
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
index d9fa4b918bb..68f40be923c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
@@ -19,7 +19,9 @@ package org.apache.doris.job.extensions.insert.streaming;
 
 import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Env;
+import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.Status;
+import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.common.util.Util;
 import org.apache.doris.job.base.Job;
 import org.apache.doris.job.common.TaskStatus;
@@ -27,7 +29,7 @@ import org.apache.doris.job.exception.JobException;
 import org.apache.doris.job.extensions.insert.InsertTask;
 import org.apache.doris.job.offset.Offset;
 import org.apache.doris.job.offset.SourceOffsetProvider;
-import org.apache.doris.job.offset.s3.S3Offset;
+import org.apache.doris.load.loadv2.LoadJob;
 import org.apache.doris.nereids.StatementContext;
 import org.apache.doris.nereids.glue.LogicalPlanAdapter;
 import org.apache.doris.nereids.parser.NereidsParser;
@@ -35,12 +37,17 @@ import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableComma
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.QueryState;
 import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.thrift.TCell;
+import org.apache.doris.thrift.TRow;
 import org.apache.doris.thrift.TStatusCode;
 
 import lombok.Getter;
 import lombok.Setter;
 import lombok.extern.log4j.Log4j2;
+import org.apache.commons.lang3.StringUtils;
 
+import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -56,8 +63,6 @@ public class StreamingInsertTask {
     @Setter
     private TaskStatus status;
     private String errMsg;
-    @Setter
-    private String otherMsg;
     private Long createTimeMs;
     private Long startTimeMs;
     private Long finishTimeMs;
@@ -154,17 +159,11 @@ public class StreamingInsertTask {
     }
 
     private void run() throws JobException {
-        StreamingInsertJob job =
-                (StreamingInsertJob) 
Env.getCurrentEnv().getJobManager().getJob(getJobId());
-        StreamingInsertTask runningStreamTask = job.getRunningStreamTask();
-        log.info("current running stream task id is {} for job id {}",
-                runningStreamTask == null ? -1 : 
runningStreamTask.getTaskId(), getJobId());
         if (isCanceled.get()) {
             log.info("task has been canceled, task id is {}", getTaskId());
             return;
         }
-        log.info("start to run streaming insert task, label {}, offset is {}, 
filepath {}",
-                labelName, runningOffset.toString(), ((S3Offset) 
runningOffset).getFileLists());
+        log.info("start to run streaming insert task, label {}, offset is {}", 
labelName, runningOffset.toString());
         String errMsg = null;
         try {
             taskCommand.run(ctx, stmtExecutor);
@@ -221,10 +220,12 @@ public class StreamingInsertTask {
                 || TaskStatus.CANCELED.equals(status)) {
             return;
         }
+        status = TaskStatus.CANCELED;
         if (isCanceled.get()) {
             return;
         }
         isCanceled.getAndSet(true);
+        this.errMsg = "task cancelled";
         if (null != stmtExecutor) {
             log.info("cancelling streaming insert task, job id is {}, task id 
is {}",
                     getJobId(), getTaskId());
@@ -254,4 +255,56 @@ public class StreamingInsertTask {
         }
         return false;
     }
+
+    /**
+     * show streaming insert task info detail
+     */
+    public TRow getTvfInfo(String jobName) {
+        TRow trow = new TRow();
+        trow.addToColumnValue(new 
TCell().setStringVal(String.valueOf(this.getTaskId())));
+        trow.addToColumnValue(new 
TCell().setStringVal(String.valueOf(this.getJobId())));
+        trow.addToColumnValue(new TCell().setStringVal(jobName));
+        trow.addToColumnValue(new TCell().setStringVal(this.getLabelName()));
+        trow.addToColumnValue(new 
TCell().setStringVal(this.getStatus().name()));
+        // err msg
+        trow.addToColumnValue(new 
TCell().setStringVal(StringUtils.isNotBlank(errMsg)
+                ? errMsg : FeConstants.null_string));
+
+        // create time
+        trow.addToColumnValue(new 
TCell().setStringVal(TimeUtils.longToTimeString(this.getCreateTimeMs())));
+        trow.addToColumnValue(new TCell().setStringVal(null == 
getStartTimeMs() ? FeConstants.null_string
+                : TimeUtils.longToTimeString(this.getStartTimeMs())));
+        // load end time
+        trow.addToColumnValue(new 
TCell().setStringVal(TimeUtils.longToTimeString(this.getFinishTimeMs())));
+
+        List<LoadJob> loadJobs = Env.getCurrentEnv().getLoadManager()
+                .queryLoadJobsByJobIds(Arrays.asList(this.getTaskId()));
+        if (!loadJobs.isEmpty()) {
+            LoadJob loadJob = loadJobs.get(0);
+            if (loadJob.getLoadingStatus() != null && 
loadJob.getLoadingStatus().getTrackingUrl() != null) {
+                trow.addToColumnValue(new 
TCell().setStringVal(loadJob.getLoadingStatus().getTrackingUrl()));
+            } else {
+                trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
+            }
+
+            if (loadJob.getLoadStatistic() != null) {
+                trow.addToColumnValue(new 
TCell().setStringVal(loadJob.getLoadStatistic().toJson()));
+            } else {
+                trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
+            }
+        } else {
+            trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
+            trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
+        }
+
+        if (this.getUserIdentity() == null) {
+            trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
+        } else {
+            trow.addToColumnValue(new 
TCell().setStringVal(this.getUserIdentity().getQualifiedUser()));
+        }
+        trow.addToColumnValue(new TCell().setStringVal(""));
+        trow.addToColumnValue(new TCell().setStringVal(runningOffset == null
+                ? FeConstants.null_string : runningOffset.toString()));
+        return trow;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
index 207509d57fe..0f20dbd4c1e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
@@ -123,6 +123,7 @@ public class StreamingJobProperties implements 
JobProperties {
         if (!sessionVarMap.isEmpty()) {
             try {
                 sessionVariable.setInsertTimeoutS(DEFAULT_INSERT_TIMEOUT);
+                sessionVariable.setQueryTimeoutS(DEFAULT_INSERT_TIMEOUT);
                 sessionVariable.readFromMap(sessionVarMap);
             } catch (Exception e) {
                 throw new JobException("Invalid session variable, " + 
e.getMessage());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
index a7a26596e62..7f483a8f587 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
@@ -17,24 +17,15 @@
 
 package org.apache.doris.job.extensions.insert.streaming;
 
-import org.apache.doris.catalog.Env;
 import org.apache.doris.common.Config;
-import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.InternalErrorCode;
-import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.job.common.FailureReason;
 import org.apache.doris.job.common.JobStatus;
 import org.apache.doris.job.exception.JobException;
 import org.apache.doris.job.task.AbstractTask;
-import org.apache.doris.load.loadv2.LoadJob;
-import org.apache.doris.thrift.TCell;
 import org.apache.doris.thrift.TRow;
 
 import lombok.extern.log4j.Log4j2;
-import org.apache.commons.lang3.StringUtils;
-
-import java.util.Arrays;
-import java.util.List;
 
 @Log4j2
 public class StreamingJobSchedulerTask extends AbstractTask {
@@ -110,72 +101,12 @@ public class StreamingJobSchedulerTask extends 
AbstractTask {
 
     @Override
     protected void executeCancelLogic(boolean needWaitCancelComplete) throws 
Exception {
-        log.info("cancelling streaming insert job scheduler task for job id 
{}", streamingInsertJob.getJobId());
-        if (streamingInsertJob.getRunningStreamTask() != null) {
-            
streamingInsertJob.getRunningStreamTask().cancel(needWaitCancelComplete);
-        }
+        // cancel logic in streaming insert task
     }
 
     @Override
     public TRow getTvfInfo(String jobName) {
-        StreamingInsertTask runningTask = 
streamingInsertJob.getRunningStreamTask();
-        if (runningTask == null) {
-            return null;
-        }
-        if (!streamingInsertJob.needScheduleTask()) {
-            //todo: should list history task
-            return null;
-        }
-        TRow trow = new TRow();
-        trow.addToColumnValue(new 
TCell().setStringVal(String.valueOf(runningTask.getTaskId())));
-        trow.addToColumnValue(new 
TCell().setStringVal(String.valueOf(runningTask.getJobId())));
-        trow.addToColumnValue(new TCell().setStringVal(jobName));
-        trow.addToColumnValue(new 
TCell().setStringVal(runningTask.getLabelName()));
-        trow.addToColumnValue(new 
TCell().setStringVal(runningTask.getStatus().name()));
-        // err msg
-        String errMsg = "";
-        if (StringUtils.isNotBlank(runningTask.getErrMsg())
-                && !FeConstants.null_string.equals(runningTask.getErrMsg())) {
-            errMsg = runningTask.getErrMsg();
-        } else {
-            errMsg = runningTask.getOtherMsg();
-        }
-        trow.addToColumnValue(new 
TCell().setStringVal(StringUtils.isNotBlank(errMsg)
-                ? errMsg : FeConstants.null_string));
-
-        // create time
-        trow.addToColumnValue(new 
TCell().setStringVal(TimeUtils.longToTimeString(runningTask.getCreateTimeMs())));
-        trow.addToColumnValue(new TCell().setStringVal(null == 
getStartTimeMs() ? FeConstants.null_string
-                : TimeUtils.longToTimeString(runningTask.getStartTimeMs())));
-        // load end time
-        trow.addToColumnValue(new 
TCell().setStringVal(TimeUtils.longToTimeString(runningTask.getFinishTimeMs())));
-
-        List<LoadJob> loadJobs = Env.getCurrentEnv().getLoadManager()
-                .queryLoadJobsByJobIds(Arrays.asList(runningTask.getTaskId()));
-        if (!loadJobs.isEmpty()) {
-            LoadJob loadJob = loadJobs.get(0);
-            if (loadJob.getLoadingStatus() != null && 
loadJob.getLoadingStatus().getTrackingUrl() != null) {
-                trow.addToColumnValue(new 
TCell().setStringVal(loadJob.getLoadingStatus().getTrackingUrl()));
-            } else {
-                trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
-            }
-
-            if (loadJob.getLoadStatistic() != null) {
-                trow.addToColumnValue(new 
TCell().setStringVal(loadJob.getLoadStatistic().toJson()));
-            } else {
-                trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
-            }
-        } else {
-            trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
-            trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
-        }
-
-        if (runningTask.getUserIdentity() == null) {
-            trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
-        } else {
-            trow.addToColumnValue(new 
TCell().setStringVal(runningTask.getUserIdentity().getQualifiedUser()));
-        }
-        trow.addToColumnValue(new TCell().setStringVal(""));
-        return trow;
+        // only show streaming insert task info in job tvf
+        return null;
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
index 0e5ae417e73..1605ada23d0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
@@ -50,6 +50,6 @@ public class S3Offset implements Offset {
 
     @Override
     public String toString() {
-        return "{\"endFile\": \"" + endFile + "\" }";
+        return "{\"endFile\": \"" + endFile + "\"}";
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
index 7e99ca3ada9..27a15fb959e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
@@ -52,6 +52,8 @@ public class StreamingTaskScheduler extends MasterDaemon {
     private final ScheduledThreadPoolExecutor delayScheduler
                 = new ScheduledThreadPoolExecutor(1, new 
CustomThreadFactory("streaming-task-delay-scheduler"));
 
+    private static long DELAY_SCHEDULER_MS = 500;
+
     public StreamingTaskScheduler() {
         super("Streaming-task-scheduler", 1);
     }
@@ -114,12 +116,17 @@ public class StreamingTaskScheduler extends MasterDaemon {
         }
         // reject task if no more data to consume
         if (!job.hasMoreDataToConsume()) {
-            scheduleTaskWithDelay(task, 500);
+            String delayMsg = "No data available for consumption at the 
moment, will retry after "
+                    + (System.currentTimeMillis() + DELAY_SCHEDULER_MS);
+            job.setJobRuntimeMsg(delayMsg);
+            scheduleTaskWithDelay(task, DELAY_SCHEDULER_MS);
             return;
         }
         log.info("prepare to schedule task, task id: {}, job id: {}", 
task.getTaskId(), task.getJobId());
         job.setLastScheduleTaskTimestamp(System.currentTimeMillis());
         
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().addRunningTask(task);
+        // clear delay msg
+        job.setJobRuntimeMsg("");
         long start = System.currentTimeMillis();
         try {
             task.execute();
@@ -131,8 +138,6 @@ public class StreamingTaskScheduler extends MasterDaemon {
     }
 
     private void scheduleTaskWithDelay(StreamingInsertTask task, long delayMs) 
{
-        task.setOtherMsg("No data available for consumption at the moment, 
will retry after "
-                + (System.currentTimeMillis() + delayMs));
         delayScheduler.schedule(() -> {
             
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().registerTask(task);
         }, delayMs, TimeUnit.MILLISECONDS);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
index 2ce92cb1b1e..c40dd0a03f2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
@@ -68,6 +68,7 @@ import 
org.apache.doris.datasource.maxcompute.MaxComputeExternalCatalog;
 import org.apache.doris.datasource.mvcc.MvccUtil;
 import org.apache.doris.job.common.JobType;
 import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
+import org.apache.doris.job.extensions.insert.streaming.StreamingInsertTask;
 import org.apache.doris.job.extensions.mtmv.MTMVJob;
 import org.apache.doris.job.task.AbstractTask;
 import org.apache.doris.mtmv.BaseTableInfo;
@@ -1228,11 +1229,23 @@ public class MetadataGenerator {
                     continue;
                 }
             }
-            List<AbstractTask> tasks = job.queryAllTasks();
-            for (AbstractTask task : tasks) {
-                TRow tvfInfo = task.getTvfInfo(job.getJobName());
-                if (tvfInfo != null) {
-                    dataBatch.add(tvfInfo);
+
+            if (job instanceof StreamingInsertJob) {
+                StreamingInsertJob streamingJob = (StreamingInsertJob) job;
+                List<StreamingInsertTask> streamingInsertTasks = 
streamingJob.queryAllStreamTasks();
+                for (StreamingInsertTask task : streamingInsertTasks) {
+                    TRow tvfInfo = task.getTvfInfo(job.getJobName());
+                    if (tvfInfo != null) {
+                        dataBatch.add(tvfInfo);
+                    }
+                }
+            } else {
+                List<AbstractTask> tasks = job.queryAllTasks();
+                for (AbstractTask task : tasks) {
+                    TRow tvfInfo = task.getTvfInfo(job.getJobName());
+                    if (tvfInfo != null) {
+                        dataBatch.add(tvfInfo);
+                    }
                 }
             }
         }
diff --git 
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy 
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
index 93dc64dfd4e..1e65397742a 100644
--- 
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
+++ 
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
@@ -88,12 +88,12 @@ suite("test_streaming_insert_job") {
         PAUSE JOB where jobname =  '${jobName}'
     """
     def pausedJobStatus = sql """
-        select status from jobs("type"="insert") where Name='${jobName}'
+        select status, SucceedTaskCount + FailedTaskCount + CanceledTaskCount 
from jobs("type"="insert") where Name='${jobName}'
     """
     assert pausedJobStatus.get(0).get(0) == "PAUSED"
 
-    def pauseShowTask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
-    assert pauseShowTask.size() == 0
+    def pauseShowTask = sql """select count(1) from tasks("type"="insert") 
where JobName='${jobName}'"""
+    assert pauseShowTask.get(0).get(0) == pausedJobStatus.get(0).get(1)
 
     // check encrypt sk
     def jobExecuteSQL = sql """
@@ -141,19 +141,22 @@ suite("test_streaming_insert_job") {
         RESUME JOB where jobname =  '${jobName}'
     """
     def resumeJobStatus = sql """
-        select status,properties,currentOffset from jobs("type"="insert") 
where Name='${jobName}'
+        select status,properties,currentOffset, SucceedTaskCount + 
FailedTaskCount + CanceledTaskCount from jobs("type"="insert") where 
Name='${jobName}'
     """
     assert resumeJobStatus.get(0).get(0) == "RUNNING" || 
resumeJobStatus.get(0).get(0) == "PENDING"
     assert resumeJobStatus.get(0).get(1) == 
"{\"s3.max_batch_files\":\"1\",\"session.insert_max_filter_ratio\":\"0.5\"}"
     assert resumeJobStatus.get(0).get(2) == 
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
 
+
     Awaitility.await().atMost(60, SECONDS)
             .pollInterval(1, SECONDS).until(
             {
                 print("check create streaming task count")
-                def resumeShowTask = sql """select * from 
tasks("type"="insert") where JobName='${jobName}'"""
-                // check streaming task create success
-                resumeShowTask.size() == 1
+                def resumeShowTask = sql """select count(1) from 
tasks("type"="insert") where JobName='${jobName}'"""
+                def lastTaskStatus = sql """select status from 
tasks("type"="insert") where JobName='${jobName}' limit 1 """
+                // A new task is generated
+                resumeShowTask.get(0).get(0) > resumeJobStatus.get(0).get(3) &&
+                        ( lastTaskStatus.get(0).get(0) == "PENDING" || 
lastTaskStatus.get(0).get(0) == "RUNNING" )
             }
     )
 
diff --git 
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_offset.groovy
 
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_offset.groovy
index 31f8421ff42..f1c2f91e00b 100644
--- 
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_offset.groovy
+++ 
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_offset.groovy
@@ -160,12 +160,12 @@ suite("test_streaming_insert_job_offset") {
         PAUSE JOB where jobname =  '${jobName}'
     """
     def pausedJobStatus = sql """
-        select status from jobs("type"="insert") where Name='${jobName}'
+        select status, SucceedTaskCount + FailedTaskCount + CanceledTaskCount  
from jobs("type"="insert") where Name='${jobName}'
     """
     assert pausedJobStatus.get(0).get(0) == "PAUSED"
 
-    def pauseShowTask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
-    assert pauseShowTask.size() == 0
+    def pauseShowTask = sql """select count(1) from tasks("type"="insert") 
where JobName='${jobName}'"""
+    assert pauseShowTask.get(0).get(0) == pausedJobStatus.get(0).get(1)
 
     def jobInfo = sql """
         select currentOffset, endoffset, loadStatistic, properties from 
jobs("type"="insert") where Name='${jobName}'


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to