This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch streaming-job-dev
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/streaming-job-dev by this push:
     new cc4c616df09 [Feature](WIP) add StreamingInsertTask and improve 
StreamInsertJob (#55862)
cc4c616df09 is described below

commit cc4c616df092be1928f9a93657b590641faf8cd9
Author: wudi <[email protected]>
AuthorDate: Thu Sep 11 09:45:50 2025 +0800

    [Feature](WIP) add StreamingInsertTask and improve StreamInsertJob (#55862)
    
    ### What problem does this PR solve?
    
    1. add StreamingInsertTask For StreamJob
    2. Improve StreamInsertJob
    3. add insertcommand rewrite tvf params
---
 .../doris/job/extensions/insert/InsertJob.java     |   2 +-
 .../insert/streaming/StreamingInsertJob.java       | 195 +++++++++++++++++---
 .../insert/streaming/StreamingInsertTask.java      | 196 ++++++++++++++++++++-
 .../insert/streaming/StreamingJobProperties.java   |  28 ++-
 .../streaming/StreamingJobSchedulerTask.java       |  33 +++-
 .../org/apache/doris/job/manager/JobManager.java   |  17 ++
 .../doris/job/offset/SourceOffsetProvider.java     |  14 +-
 .../job/offset/SourceOffsetProviderFactory.java    |  19 +-
 .../org/apache/doris/job/offset/s3/S3Offset.java   |   5 +-
 .../job/offset/s3/S3SourceOffsetProvider.java      |  35 +++-
 .../job/scheduler/StreamingTaskScheduler.java      |  11 +-
 .../org/apache/doris/job/task/AbstractTask.java    |   2 +-
 .../trees/plans/commands/AlterJobCommand.java      |   1 -
 .../trees/plans/commands/CreateJobCommand.java     |  12 ++
 .../trees/plans/commands/info/CreateJobInfo.java   |   6 +-
 .../commands/insert/InsertIntoTableCommand.java    |  47 +++++
 16 files changed, 568 insertions(+), 55 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
index 5b386886b19..e855aa4f836 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
@@ -105,7 +105,7 @@ public class InsertJob extends AbstractJob<InsertTask, 
Map<Object, Object>> impl
             .add(new Column("ErrorMsg", ScalarType.createStringType()))
             .build();
 
-    private static final ShowResultSetMetaData TASK_META_DATA =
+    public static final ShowResultSetMetaData TASK_META_DATA =
             ShowResultSetMetaData.builder()
                     .addColumn(new Column("TaskId", 
ScalarType.createVarchar(80)))
                     .addColumn(new Column("Label", 
ScalarType.createVarchar(80)))
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index a4bbf51b8a4..c88fd019c46 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -18,46 +18,70 @@
 package org.apache.doris.job.extensions.insert.streaming;
 
 import org.apache.doris.analysis.UserIdentity;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.UserException;
 import org.apache.doris.common.io.Text;
+import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.job.base.AbstractJob;
+import org.apache.doris.job.base.JobExecuteType;
 import org.apache.doris.job.base.JobExecutionConfiguration;
 import org.apache.doris.job.common.JobStatus;
 import org.apache.doris.job.common.JobType;
 import org.apache.doris.job.common.PauseReason;
+import org.apache.doris.job.common.TaskType;
 import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.extensions.insert.InsertJob;
+import org.apache.doris.job.offset.SourceOffsetProvider;
+import org.apache.doris.job.offset.SourceOffsetProviderFactory;
+import org.apache.doris.job.task.AbstractTask;
+import org.apache.doris.load.FailMsg;
+import org.apache.doris.load.loadv2.LoadStatistic;
+import org.apache.doris.nereids.parser.NereidsParser;
+import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
 import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.ShowResultSetMetaData;
+import org.apache.doris.thrift.TCell;
+import org.apache.doris.thrift.TRow;
+import org.apache.doris.transaction.TransactionException;
+import org.apache.doris.transaction.TransactionState;
+import org.apache.doris.transaction.TxnStateChangeCallback;
 
 import com.google.gson.annotations.SerializedName;
 import lombok.Getter;
 import lombok.Setter;
+import org.apache.commons.collections.CollectionUtils;
 
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
-public class StreamingInsertJob extends AbstractJob<StreamingJobSchedulerTask, 
Map<Object, Object>> {
+public class StreamingInsertJob extends AbstractJob<StreamingJobSchedulerTask, 
Map<Object, Object>> implements
+        TxnStateChangeCallback {
 
     @SerializedName("did")
     private final long dbId;
-
+    private LoadStatistic loadStatistic = new LoadStatistic();
+    @SerializedName("fm")
+    private FailMsg failMsg;
     @Getter
     protected PauseReason pauseReason;
-
     @Getter
     @Setter
     protected long latestAutoResumeTimestamp;
-
     @Getter
     @Setter
     protected long autoResumeCount;
-
     @Getter
     @SerializedName("jp")
     private StreamingJobProperties jobProperties;
-
+    @Getter
+    StreamingInsertTask runningStreamTask;
+    SourceOffsetProvider offsetProvider;
     private long lastScheduleTaskTimestamp = -1L;
 
     public StreamingInsertJob(String jobName,
@@ -73,6 +97,14 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
                 jobConfig, createTimeMs, executeSql);
         this.dbId = ConnectContext.get().getCurrentDbId();
         this.jobProperties = jobProperties;
+        String tvfType = parseTvfType();
+        this.offsetProvider = 
SourceOffsetProviderFactory.createSourceOffsetProvider(tvfType);
+    }
+
+    private String parseTvfType() {
+        NereidsParser parser = new NereidsParser();
+        InsertIntoTableCommand command = (InsertIntoTableCommand) 
parser.parseSingle(getExecuteSql());
+        return command.getFirstTvfName();
     }
 
     @Override
@@ -80,35 +112,109 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         super.updateJobStatus(status);
     }
 
-    protected void createStreamingInsertTask() {
+    @Override
+    public JobType getJobType() {
+        return JobType.INSERT;
+    }
+
+    @Override
+    protected void checkJobParamsInternal() {
+    }
+
+    @Override
+    public boolean isReadyForScheduling(Map<Object, Object> taskContext) {
+        return CollectionUtils.isEmpty(getRunningTasks());
+    }
+
+    @Override
+    public List<StreamingJobSchedulerTask> createTasks(TaskType taskType, 
Map<Object, Object> taskContext) {
+        List<StreamingJobSchedulerTask> newTasks = new ArrayList<>();
+        StreamingJobSchedulerTask streamingJobSchedulerTask = new 
StreamingJobSchedulerTask(this);
+        newTasks.add(streamingJobSchedulerTask);
+        super.initTasks(newTasks, taskType);
+        return newTasks;
+    }
+
+    protected StreamingInsertTask createStreamingInsertTask() {
+        InsertIntoTableCommand command = 
offsetProvider.rewriteTvfParams(getExecuteSql());
+        this.runningStreamTask = new StreamingInsertTask(getJobId(), 
AbstractTask.getNextTaskId(), command,
+                loadStatistic, getCurrentDbName(), 
offsetProvider.getCurrentOffset(), jobProperties);
+        return this.runningStreamTask;
     }
 
     protected void fetchMeta() {
+        offsetProvider.fetchRemoteMeta();
+    }
+
+    public boolean needScheduleTask() {
+        return (getJobStatus().equals(JobStatus.RUNNING) || 
getJobStatus().equals(JobStatus.PENDING));
+    }
+
+    // When consumer to EOF, delay schedule task appropriately can avoid too 
many small transactions.
+    public boolean needDelayScheduleTask() {
+        return System.currentTimeMillis() - lastScheduleTaskTimestamp > 
jobProperties.getMaxIntervalSecond() * 1000;
+    }
+
+    public boolean hasMoreDataToConsume() {
+        return offsetProvider.hasMoreDataToConsume();
     }
 
     @Override
-    public JobType getJobType() {
-        return JobType.INSERT;
+    public void onTaskFail(StreamingJobSchedulerTask task) throws JobException 
{
+        // Here is the failure of StreamingJobSchedulerTask, no processing is 
required
+        getRunningTasks().remove(task);
     }
 
     @Override
-    protected void checkJobParamsInternal() {
+    public void onTaskSuccess(StreamingJobSchedulerTask task) throws 
JobException {
+        // Here is the success of StreamingJobSchedulerTask, no processing is 
required
+        getRunningTasks().remove(task);
+    }
+
+    public void onStreamTaskFail(StreamingInsertTask task) throws JobException 
{
+        if (getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) {
+            this.failMsg = new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, 
task.getErrMsg());
+        }
+        updateJobStatus(JobStatus.PAUSED);
     }
 
+    public void onStreamTaskSuccess(StreamingInsertTask task) {
+        StreamingInsertTask nextTask = createStreamingInsertTask();
+        this.runningStreamTask = nextTask;
+        
Env.getCurrentEnv().getJobManager().getStreamingTaskScheduler().registerTask(runningStreamTask);
+    }
+
+
     @Override
-    public boolean isReadyForScheduling(Map<Object, Object> taskContext) {
-        return true;
+    public ShowResultSetMetaData getTaskMetaData() {
+        return InsertJob.TASK_META_DATA;
     }
 
     @Override
-    public java.util.List<StreamingJobSchedulerTask> 
createTasks(org.apache.doris.job.common.TaskType taskType,
-                                                                 Map<Object, 
Object> taskContext) {
-        return java.util.Collections.emptyList();
+    public List<String> getShowInfo() {
+        return getCommonShowInfo();
     }
 
     @Override
-    public org.apache.doris.qe.ShowResultSetMetaData getTaskMetaData() {
-        return org.apache.doris.qe.ShowResultSetMetaData.builder().build();
+    public TRow getTvfInfo() {
+        TRow trow = new TRow();
+        trow.addToColumnValue(new 
TCell().setStringVal(String.valueOf(getJobId())));
+        trow.addToColumnValue(new TCell().setStringVal(getJobName()));
+        trow.addToColumnValue(new 
TCell().setStringVal(getCreateUser().getQualifiedUser()));
+        trow.addToColumnValue(new 
TCell().setStringVal(getJobConfig().getExecuteType().name()));
+        trow.addToColumnValue(new 
TCell().setStringVal(getJobConfig().convertRecurringStrategyToString()));
+        trow.addToColumnValue(new TCell().setStringVal(getJobStatus().name()));
+        trow.addToColumnValue(new TCell().setStringVal(getExecuteSql()));
+        trow.addToColumnValue(new 
TCell().setStringVal(TimeUtils.longToTimeString(getCreateTimeMs())));
+        trow.addToColumnValue(new 
TCell().setStringVal(String.valueOf(getSucceedTaskCount().get())));
+        trow.addToColumnValue(new 
TCell().setStringVal(String.valueOf(getFailedTaskCount().get())));
+        trow.addToColumnValue(new 
TCell().setStringVal(String.valueOf(getCanceledTaskCount().get())));
+        trow.addToColumnValue(new TCell().setStringVal(getComment()));
+        trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
+        trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
+        trow.addToColumnValue(new 
TCell().setStringVal(loadStatistic.toJson()));
+        trow.addToColumnValue(new TCell().setStringVal(failMsg == null ? 
FeConstants.null_string : failMsg.getMsg()));
+        return trow;
     }
 
     @Override
@@ -119,7 +225,11 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
 
     @Override
     public List<StreamingJobSchedulerTask> queryTasks() {
-        return new ArrayList<>();
+        if (!getRunningTasks().isEmpty()) {
+            return getRunningTasks();
+        } else {
+            return Arrays.asList(new StreamingJobSchedulerTask(this));
+        }
     }
 
     @Override
@@ -127,17 +237,50 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         Text.writeString(out, GsonUtils.GSON.toJson(this));
     }
 
-    public boolean needScheduleTask() {
-        return (getJobStatus().equals(JobStatus.RUNNING) || 
getJobStatus().equals(JobStatus.PENDING));
+    @Override
+    public long getId() {
+        return getJobId();
     }
 
-    // When consumer to EOF, delay schedule task appropriately can avoid too 
many small transactions.
-    public boolean needDelayScheduleTask() {
-        return System.currentTimeMillis() - lastScheduleTaskTimestamp > 
jobProperties.getMaxIntervalSecond() * 1000;
+    @Override
+    public void beforeCommitted(TransactionState txnState) throws 
TransactionException {
+
     }
 
-    public boolean hasMoreDataToConsume() {
-        // TODO: implement this
-        return true;
+    @Override
+    public void beforeAborted(TransactionState txnState) throws 
TransactionException {
+
+    }
+
+    @Override
+    public void afterCommitted(TransactionState txnState, boolean txnOperated) 
throws UserException {
+
+    }
+
+    @Override
+    public void replayOnCommitted(TransactionState txnState) {
+
+    }
+
+    @Override
+    public void afterAborted(TransactionState txnState, boolean txnOperated, 
String txnStatusChangeReason)
+            throws UserException {
+
+    }
+
+    @Override
+    public void replayOnAborted(TransactionState txnState) {
+
+    }
+
+    @Override
+    public void afterVisible(TransactionState txnState, boolean txnOperated) {
+
+    }
+
+    @Override
+    public void replayOnVisible(TransactionState txnState) {
+
+
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
index 4e07ee01e30..56a59d3a7f6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
@@ -17,20 +17,206 @@
 
 package org.apache.doris.job.extensions.insert.streaming;
 
+import org.apache.doris.analysis.UserIdentity;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Status;
+import org.apache.doris.common.util.Util;
+import org.apache.doris.job.base.Job;
+import org.apache.doris.job.common.TaskStatus;
+import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.extensions.insert.InsertTask;
+import org.apache.doris.job.offset.Offset;
+import org.apache.doris.load.loadv2.LoadStatistic;
+import org.apache.doris.nereids.StatementContext;
+import org.apache.doris.nereids.glue.LogicalPlanAdapter;
+import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.QueryState;
+import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.thrift.TStatusCode;
+
 import lombok.Getter;
+import lombok.extern.log4j.Log4j2;
+
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
 
+@Log4j2
+@Getter
 public class StreamingInsertTask {
-    @Getter
+    private static final String LABEL_SPLITTER = "_";
+    private static final int MAX_RETRY = 3;
     private long jobId;
-
-    @Getter
     private long taskId;
+    private String labelName;
+    private TaskStatus status;
+    private String errMsg;
+    private Long createTimeMs;
+    private Long startTimeMs;
+    private Long finishTimeMs;
+    private InsertIntoTableCommand command;
+    private StmtExecutor stmtExecutor;
+    private String currentDb;
+    private UserIdentity userIdentity;
+    private ConnectContext ctx;
+    private LoadStatistic loadStatistic;
+    private Offset offset;
+    private AtomicBoolean isCanceled = new AtomicBoolean(false);
+    private StreamingJobProperties jobProperties;
 
-    public StreamingInsertTask(long jobId, long taskId) {
+    public StreamingInsertTask(long jobId,
+                               long taskId,
+                               InsertIntoTableCommand command,
+                               LoadStatistic loadStatistic,
+                               String currentDb,
+                               Offset offset,
+                               StreamingJobProperties jobProperties) {
         this.jobId = jobId;
         this.taskId = taskId;
+        this.command = command;
+        this.loadStatistic = loadStatistic;
+        this.userIdentity = ctx.getCurrentUserIdentity();
+        this.currentDb = currentDb;
+        this.offset = offset;
+        this.jobProperties = jobProperties;
+        this.labelName = getJobId() + LABEL_SPLITTER + getTaskId();
+        this.createTimeMs = System.currentTimeMillis();
+    }
+
+    public void execute() throws JobException {
+        try {
+            before();
+            run();
+            onSuccess();
+        } catch (Exception e) {
+            if (TaskStatus.CANCELED.equals(status)) {
+                return;
+            }
+            onFail(e.getMessage());
+            log.warn("execute task error, job id is {}, task id is {}", jobId, 
taskId, e);
+        } finally {
+            // The cancel logic will call the closeOrReleased Resources method 
by itself.
+            // If it is also called here,
+            // it may result in the inability to obtain relevant information 
when canceling the task
+            if (!TaskStatus.CANCELED.equals(status)) {
+                closeOrReleaseResources();
+            }
+        }
+    }
+
+    private void before() throws JobException {
+        this.startTimeMs = System.currentTimeMillis();
+        if (isCanceled.get()) {
+            throw new JobException("Export executor has been canceled, task 
id: {}", getTaskId());
+        }
+        ctx = InsertTask.makeConnectContext(userIdentity, currentDb);
+        ctx.setSessionVariable(jobProperties.getSessionVariable());
+        StatementContext statementContext = new StatementContext();
+        ctx.setStatementContext(statementContext);
+        this.command.setLabelName(Optional.of(this.labelName));
+        this.command.setJobId(getTaskId());
+        stmtExecutor = new StmtExecutor(ctx, new LogicalPlanAdapter(command, 
ctx.getStatementContext()));
+    }
+
+    private void run() throws JobException {
+        String errMsg = null;
+        int retry = 0;
+        while (retry <= MAX_RETRY) {
+            try {
+                if (isCanceled.get()) {
+                    log.info("task has been canceled, task id is {}", 
getTaskId());
+                    return;
+                }
+                command.runWithUpdateInfo(ctx, stmtExecutor, loadStatistic);
+                if (ctx.getState().getStateType() == 
QueryState.MysqlStateType.OK) {
+                    return;
+                } else {
+                    errMsg = ctx.getState().getErrorMessage();
+                }
+                log.error(
+                        "streaming insert failed with {}, reason {}, to retry",
+                        command.getLabelName(),
+                        errMsg);
+                if (retry == MAX_RETRY) {
+                    errMsg = "reached max retry times, failed with" + errMsg;
+                }
+            } catch (Exception e) {
+                log.warn("execute insert task error, label is {},offset is 
{}", command.getLabelName(),
+                         offset.toJson(), e);
+                errMsg = Util.getRootCauseMessage(e);
+            }
+            retry++;
+        }
+        log.error("streaming insert task failed, job id is {}, task id is {}, 
offset is {}, errMsg is {}",
+                getJobId(), getTaskId(), offset.toJson(), errMsg);
+        throw new JobException(errMsg);
+    }
+
+    public boolean onSuccess() throws JobException {
+        if (TaskStatus.CANCELED.equals(status)) {
+            return false;
+        }
+        this.status = TaskStatus.SUCCESS;
+        this.finishTimeMs = System.currentTimeMillis();
+        if (!isCallable()) {
+            return false;
+        }
+        Job job = Env.getCurrentEnv().getJobManager().getJob(getJobId());
+        if (null == job) {
+            log.info("job is null, job id is {}", jobId);
+            return false;
+        }
+
+        StreamingInsertJob streamingInsertJob = (StreamingInsertJob) job;
+        streamingInsertJob.onStreamTaskSuccess(this);
+        return true;
+    }
+
+    public void onFail(String errMsg) throws JobException {
+        this.errMsg = errMsg;
+        if (TaskStatus.CANCELED.equals(status)) {
+            return;
+        }
+        this.status = TaskStatus.FAILED;
+        this.finishTimeMs = System.currentTimeMillis();
+        if (!isCallable()) {
+            return;
+        }
+        Job job = Env.getCurrentEnv().getJobManager().getJob(getJobId());
+        StreamingInsertJob streamingInsertJob = (StreamingInsertJob) job;
+        streamingInsertJob.onStreamTaskFail(this);
+    }
+
+    public void cancel(boolean needWaitCancelComplete) throws Exception {
+        if (isCanceled.get()) {
+            return;
+        }
+        isCanceled.getAndSet(true);
+        if (null != stmtExecutor) {
+            stmtExecutor.cancel(new Status(TStatusCode.CANCELLED, "streaming 
insert task cancelled"),
+                    needWaitCancelComplete);
+        }
+    }
+
+    public void closeOrReleaseResources() {
+        if (null != stmtExecutor) {
+            stmtExecutor = null;
+        }
+        if (null != command) {
+            command = null;
+        }
+        if (null != ctx) {
+            ctx = null;
+        }
     }
 
-    public void execute() {
+    private boolean isCallable() {
+        if (status.equals(TaskStatus.CANCELED)) {
+            return false;
+        }
+        if (null != Env.getCurrentEnv().getJobManager().getJob(jobId)) {
+            return true;
+        }
+        return false;
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
index 25d256b127b..4f463a090d5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
@@ -20,9 +20,13 @@ package org.apache.doris.job.extensions.insert.streaming;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.util.Util;
 import org.apache.doris.job.base.JobProperties;
+import org.apache.doris.job.exception.JobException;
+import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.qe.SessionVariable;
 
 import lombok.Data;
 
+import java.util.HashMap;
 import java.util.Map;
 
 @Data
@@ -30,13 +34,16 @@ public class StreamingJobProperties implements 
JobProperties {
     public static final String MAX_INTERVAL_SECOND_PROPERTY = "max_interval";
     public static final String S3_BATCH_FILES_PROPERTY = "s3.batch_files";
     public static final String S3_BATCH_SIZE_PROPERTY = "s3.batch_size";
+    public static final String SESSION_VAR_PREFIX = "session.";
+
     public static final long DEFAULT_MAX_INTERVAL_SECOND = 10;
     public static final long DEFAULT_S3_BATCH_FILES = 256;
     public static final long DEFAULT_S3_BATCH_SIZE = 10 * 1024 * 1024 * 1024L; 
// 10GB
-    public static final long DEFAULT_INSERT_TIMEOUT = 30 * 60; // 30min
+    public static final int DEFAULT_INSERT_TIMEOUT = 30 * 60; // 30min
 
     private final Map<String, String> properties;
     private long maxIntervalSecond;
+    private int  maxRetry;
     private long s3BatchFiles;
     private long s3BatchSize;
 
@@ -61,4 +68,23 @@ public class StreamingJobProperties implements JobProperties 
{
                         && v <= (long) (1024 * 1024 * 1024) * 10,
                 StreamingJobProperties.S3_BATCH_SIZE_PROPERTY + " should 
between 100MB and 10GB");
     }
+
+    public SessionVariable getSessionVariable() throws JobException {
+        final Map<String, String> sessionVarMap = new HashMap<>();
+        for (Map.Entry<String, String> entry : properties.entrySet()) {
+            if (entry.getKey().startsWith(SESSION_VAR_PREFIX)) {
+                String subKey = 
entry.getKey().substring(SESSION_VAR_PREFIX.length());
+                sessionVarMap.put(subKey, entry.getValue());
+            }
+        }
+
+        SessionVariable sessionVariable = new SessionVariable();
+        try {
+            sessionVariable.setInsertTimeoutS(DEFAULT_INSERT_TIMEOUT);
+            sessionVariable.readFromJson(GsonUtils.GSON.toJson(sessionVarMap));
+        } catch (Exception e) {
+            throw new JobException("Invalid session variable, " + 
e.getMessage());
+        }
+        return sessionVariable;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
index 888669f428a..d724b09fbf0 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
@@ -18,17 +18,17 @@
 package org.apache.doris.job.extensions.insert.streaming;
 
 import org.apache.doris.common.InternalErrorCode;
+import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.job.common.JobStatus;
 import org.apache.doris.job.common.PauseReason;
 import org.apache.doris.job.exception.JobException;
 import org.apache.doris.job.task.AbstractTask;
+import org.apache.doris.thrift.TCell;
 import org.apache.doris.thrift.TRow;
 
 public class StreamingJobSchedulerTask extends AbstractTask {
-
     private static final long BACK_OFF_BASIC_TIME_SEC = 10L;
     private static final long MAX_BACK_OFF_TIME_SEC = 60 * 5;
-
     private StreamingInsertJob streamingInsertJob;
 
     public StreamingJobSchedulerTask(StreamingInsertJob streamingInsertJob) {
@@ -80,14 +80,43 @@ public class StreamingJobSchedulerTask extends AbstractTask 
{
 
     @Override
     protected void closeOrReleaseResources() {
+        if (streamingInsertJob.getRunningStreamTask() != null) {
+            
streamingInsertJob.getRunningStreamTask().closeOrReleaseResources();
+        }
     }
 
     @Override
     protected void executeCancelLogic(boolean needWaitCancelComplete) throws 
Exception {
+        if (streamingInsertJob.getRunningStreamTask() != null) {
+            
streamingInsertJob.getRunningStreamTask().cancel(needWaitCancelComplete);
+        }
     }
 
     @Override
     public TRow getTvfInfo(String jobName) {
+        StreamingInsertTask runningTask = 
streamingInsertJob.getRunningStreamTask();
+        TRow trow = new TRow();
+        trow.addToColumnValue(new 
TCell().setStringVal(String.valueOf(runningTask.getTaskId())));
+        trow.addToColumnValue(new 
TCell().setStringVal(String.valueOf(runningTask.getJobId())));
+        trow.addToColumnValue(new TCell().setStringVal(jobName));
+        trow.addToColumnValue(new 
TCell().setStringVal(runningTask.getLabelName()));
+        trow.addToColumnValue(new 
TCell().setStringVal(runningTask.getStatus().name()));
+        trow.addToColumnValue(new 
TCell().setStringVal(runningTask.getErrMsg()));
+        // create time
+        trow.addToColumnValue(new 
TCell().setStringVal(TimeUtils.longToTimeString(runningTask.getCreateTimeMs())));
+        trow.addToColumnValue(new TCell().setStringVal(null == 
getStartTimeMs() ? ""
+                : TimeUtils.longToTimeString(runningTask.getStartTimeMs())));
+        // load end time
+        trow.addToColumnValue(new 
TCell().setStringVal(TimeUtils.longToTimeString(runningTask.getFinishTimeMs())));
+        // tracking url
+        trow.addToColumnValue(new TCell().setStringVal("trackingUrl"));
+        trow.addToColumnValue(new TCell().setStringVal("statistic"));
+        if (runningTask.getUserIdentity() == null) {
+            trow.addToColumnValue(new TCell().setStringVal(""));
+        } else {
+            trow.addToColumnValue(new 
TCell().setStringVal(runningTask.getUserIdentity().getQualifiedUser()));
+        }
+        trow.addToColumnValue(new 
TCell().setStringVal(runningTask.getOffset().toJson()));
         return null;
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
index ae6bad07066..9954d26b6f1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
@@ -48,6 +48,7 @@ import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.qe.ConnectContext;
 
 import com.google.common.collect.Lists;
+import lombok.Getter;
 import lombok.extern.log4j.Log4j2;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -73,6 +74,7 @@ public class JobManager<T extends AbstractJob<?, C>, C> 
implements Writable {
 
     private JobScheduler<T, C> jobScheduler;
 
+    @Getter
     private StreamingTaskScheduler streamingTaskScheduler;
 
     // lock for job
@@ -112,6 +114,21 @@ public class JobManager<T extends AbstractJob<?, C>, C> 
implements Writable {
         return jobMap.get(jobId);
     }
 
+    /**
+     * get streaming running job
+     *
+     * @return running job
+     */
+    public int getStreamingJobCnt() {
+        int count = 0;
+        for (T job : jobMap.values()) {
+            if 
(job.getJobConfig().getExecuteType().equals(JobExecuteType.STREAMING)) {
+                count++;
+            }
+        }
+        return count;
+    }
+
     public void registerJob(T job) throws JobException {
         job.initParams();
         createJobInternal(job, false);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
index f88079617de..3d620739290 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
@@ -36,11 +36,17 @@ public interface SourceOffsetProvider {
     Offset getNextOffset();
 
     /**
-     * Rewrite the TVF parameters in the InsertIntoTableCommand based on the 
current offset.
-     * @param command
+     * Get current offset
      * @return
      */
-    InsertIntoTableCommand rewriteTvfParamsInCommand(InsertIntoTableCommand 
command);
+    Offset getCurrentOffset();
+
+    /**
+     * Rewrite the TVF parameters in the SQL based on the current offset.
+     * @param sql
+     * @return rewritten InsertIntoTableCommand
+     */
+    InsertIntoTableCommand rewriteTvfParams(String sql);
 
     /**
      * Update the progress of the source.
@@ -57,6 +63,6 @@ public interface SourceOffsetProvider {
      * Whether there is more data to consume
      * @return
      */
-    boolean hasMoreData();
+    boolean hasMoreDataToConsume();
 }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProviderFactory.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProviderFactory.java
index 5ba1d903d78..9cefa4e9d42 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProviderFactory.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProviderFactory.java
@@ -17,11 +17,15 @@
 
 package org.apache.doris.job.offset;
 
+import org.apache.doris.job.exception.JobException;
 import org.apache.doris.job.offset.s3.S3SourceOffsetProvider;
 
+import lombok.extern.log4j.Log4j2;
+
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+@Log4j2
 public class SourceOffsetProviderFactory {
     private static final Map<String, Class<? extends SourceOffsetProvider>> 
map = new ConcurrentHashMap<>();
 
@@ -29,9 +33,16 @@ public class SourceOffsetProviderFactory {
         map.put("s3", S3SourceOffsetProvider.class);
     }
 
-    public static SourceOffsetProvider createSourceOffsetProvider(String 
sourceType) throws InstantiationException,
-            IllegalAccessException {
-        Class<? extends SourceOffsetProvider> cla = 
map.get(sourceType.toUpperCase());
-        return cla.newInstance();
+    public static SourceOffsetProvider createSourceOffsetProvider(String 
sourceType) {
+        try {
+            Class<? extends SourceOffsetProvider> cla = 
map.get(sourceType.toUpperCase());
+            if (cla == null) {
+                throw new JobException("Unsupported source type: " + 
sourceType);
+            }
+            return cla.newInstance();
+        } catch (Exception e) {
+            log.error("Failed to create source provider for type: " + 
sourceType, e);
+            throw new RuntimeException("Failed to create source provider for 
type: " + sourceType);
+        }
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
index 86ff467796a..a175575757f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
@@ -20,13 +20,14 @@ package org.apache.doris.job.offset.s3;
 import org.apache.doris.job.offset.Offset;
 import org.apache.doris.persist.gson.GsonUtils;
 
+import lombok.Getter;
+
 import java.util.List;
 
 public class S3Offset implements Offset {
     String startFile;
-
     String endFile;
-
+    @Getter
     List<String> fileLists;
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
index 087d9c2beb7..771736a9559 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
@@ -19,35 +19,60 @@ package org.apache.doris.job.offset.s3;
 
 import org.apache.doris.job.offset.Offset;
 import org.apache.doris.job.offset.SourceOffsetProvider;
+import org.apache.doris.nereids.parser.NereidsParser;
 import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
 
+import java.util.HashMap;
+import java.util.Map;
+
 public class S3SourceOffsetProvider implements SourceOffsetProvider {
+    S3Offset currentOffset;
+    String maxRemoteEndFile;
 
     @Override
     public String getSourceType() {
-        return null;
+        return "s3";
     }
 
     @Override
-    public Offset getNextOffset() {
+    public S3Offset getNextOffset() {
+        //todo: listObjects from end file
         return null;
     }
 
     @Override
-    public InsertIntoTableCommand 
rewriteTvfParamsInCommand(InsertIntoTableCommand command) {
-        return null;
+    public Offset getCurrentOffset() {
+        return currentOffset;
+    }
+
+    @Override
+    public InsertIntoTableCommand rewriteTvfParams(String sql) {
+        S3Offset nextOffset = getNextOffset();
+        Map<String, String> props = new HashMap<>();
+        //todo: need to change file list to glob string
+        props.put("uri", nextOffset.getFileLists().toString());
+
+        NereidsParser parser = new NereidsParser();
+        InsertIntoTableCommand command = (InsertIntoTableCommand) 
parser.parseSingle(sql);
+        command.rewriteTvfProperties(getSourceType(), props);
+        return command;
     }
 
     @Override
     public void updateProgress(Offset offset) {
+        this.currentOffset = (S3Offset) offset;
     }
 
     @Override
     public void fetchRemoteMeta() {
+        // list object
     }
 
     @Override
-    public boolean hasMoreData() {
+    public boolean hasMoreDataToConsume() {
+        if (currentOffset.endFile.compareTo(maxRemoteEndFile) < 0) {
+            return true;
+        }
         return false;
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
index d87a2b7833d..bdc27e983e1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
@@ -21,6 +21,7 @@ import org.apache.doris.catalog.Env;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.CustomThreadFactory;
 import org.apache.doris.common.util.MasterDaemon;
+import org.apache.doris.job.exception.JobException;
 import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
 import org.apache.doris.job.extensions.insert.streaming.StreamingInsertTask;
 
@@ -75,11 +76,17 @@ public class StreamingTaskScheduler extends MasterDaemon {
 
     private void scheduleTasks(List<StreamingInsertTask> tasks) {
         for (StreamingInsertTask task : tasks) {
-            threadPool.execute(() -> scheduleOneTask(task));
+            threadPool.execute(() -> {
+                try {
+                    scheduleOneTask(task);
+                } catch (Exception e) {
+                    log.error("Failed to schedule task, task id: {}, job id: 
{}", task.getTaskId(), task.getJobId(), e);
+                }
+            });
         }
     }
 
-    private void scheduleOneTask(StreamingInsertTask task) {
+    private void scheduleOneTask(StreamingInsertTask task) throws JobException 
{
         StreamingInsertJob job = (StreamingInsertJob) 
Env.getCurrentEnv().getJobManager().getJob(task.getJobId());
         if (job == null) {
             log.warn("Job not found, job id: {}", task.getJobId());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
index 4e2ac653cf7..18c5f525295 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
@@ -60,7 +60,7 @@ public abstract class AbstractTask implements Task {
         taskId = getNextTaskId();
     }
 
-    private static long getNextTaskId() {
+    public static long getNextTaskId() {
         // do not use Env.getNextId(), just generate id without logging
         return System.nanoTime() + RandomUtils.nextInt();
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
index ef8c9bb8b7a..53a90893b81 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
@@ -66,7 +66,6 @@ public class AlterJobCommand extends AlterCommand implements 
ForwardWithSync {
 
     @Override
     public void doRun(ConnectContext ctx, StmtExecutor executor) throws 
Exception {
-
         validate();
         AbstractJob job = analyzeAndBuildJobInfo(ctx);
         ctx.getEnv().getJobManager().alterJob(job);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateJobCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateJobCommand.java
index fecd457ada5..fe81921211f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateJobCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateJobCommand.java
@@ -19,7 +19,9 @@ package org.apache.doris.nereids.trees.plans.commands;
 
 import org.apache.doris.analysis.StmtType;
 import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
 import org.apache.doris.job.base.AbstractJob;
+import org.apache.doris.job.exception.JobException;
 import org.apache.doris.nereids.trees.plans.PlanType;
 import org.apache.doris.nereids.trees.plans.commands.info.CreateJobInfo;
 import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
@@ -56,10 +58,20 @@ public class CreateJobCommand extends Command implements 
ForwardWithSync {
 
     @Override
     public void run(ConnectContext ctx, StmtExecutor executor) throws 
Exception {
+        validate();
         AbstractJob job = createJobInfo.analyzeAndBuildJobInfo(ctx);
         Env.getCurrentEnv().getJobManager().registerJob(job);
     }
 
+    private void validate() throws JobException {
+        if (createJobInfo.streamingJob()) {
+            int streamingJobCnt = 
Env.getCurrentEnv().getJobManager().getStreamingJobCnt();
+            if (streamingJobCnt >= Config.max_streaming_job_num) {
+                throw new JobException("Exceed max streaming job num limit " + 
Config.max_streaming_job_num);
+            }
+        }
+    }
+
     @Override
     public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
         return visitor.visitCreateJobCommand(this, context);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java
index 0d52e23ece5..4334526630e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java
@@ -274,7 +274,7 @@ public class CreateJobInfo {
         LogicalPlan logicalPlan = parser.parseSingle(sql);
         if (logicalPlan instanceof InsertIntoTableCommand) {
             return new StreamingInsertJob(labelNameOptional.get(),
-                    JobStatus.RUNNING,
+                    JobStatus.PENDING,
                     currentDbName,
                     comment,
                     ConnectContext.get().getCurrentUserIdentity(),
@@ -314,4 +314,8 @@ public class CreateJobInfo {
         }
         return TimeUtils.timeStringToLong(str.trim());
     }
+
+    public boolean streamingJob() {
+        return streamingJob;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
index 8ffa8884dd9..a5a428c7815 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
@@ -38,6 +38,7 @@ import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.nereids.CascadesContext;
 import org.apache.doris.nereids.NereidsPlanner;
 import org.apache.doris.nereids.StatementContext;
+import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
 import org.apache.doris.nereids.analyzer.UnboundTableSink;
 import org.apache.doris.nereids.exceptions.AnalysisException;
 import org.apache.doris.nereids.glue.LogicalPlanAdapter;
@@ -79,6 +80,7 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicReference;
@@ -536,6 +538,51 @@ public class InsertIntoTableCommand extends Command 
implements NeedAuditEncrypti
         }
     }
 
+    // todo: add ut
+    public String getFirstTvfName() {
+        return getFirstTvfInPlan(getLogicalQuery());
+    }
+
+    private String getFirstTvfInPlan(LogicalPlan plan) {
+        if (plan instanceof UnboundTVFRelation) {
+            UnboundTVFRelation tvfRelation = (UnboundTVFRelation) plan;
+            return tvfRelation.getFunctionName();
+        }
+
+        for (Plan child : plan.children()) {
+            if (child instanceof LogicalPlan) {
+                String result = getFirstTvfInPlan((LogicalPlan) child);
+                if (!result.isEmpty()) {
+                    return result;
+                }
+            }
+        }
+        return "";
+    }
+
+    // todo: add ut
+    public void rewriteTvfProperties(String functionName, Map<String, String> 
props) {
+        rewriteTvfInPlan(originLogicalQuery, functionName, props);
+        if (logicalQuery.isPresent()) {
+            rewriteTvfInPlan(logicalQuery.get(), functionName, props);
+        }
+    }
+
+    private void rewriteTvfInPlan(LogicalPlan plan, String functionName, 
Map<String, String> props) {
+        if (plan instanceof UnboundTVFRelation) {
+            UnboundTVFRelation tvfRelation = (UnboundTVFRelation) plan;
+            if (functionName.equalsIgnoreCase(tvfRelation.getFunctionName())) {
+                tvfRelation.getProperties().getMap().putAll(props);
+            }
+        }
+
+        for (Plan child : plan.children()) {
+            if (child instanceof LogicalPlan) {
+                rewriteTvfInPlan((LogicalPlan) child, functionName, props);
+            }
+        }
+    }
+
     @Override
     public Plan getExplainPlan(ConnectContext ctx) {
         Optional<CascadesContext> analyzeContext = Optional.of(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to