This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 870a134e464 [Fix](job) fix drop db error and duplicate fragment_id 
when retry (#57307)
870a134e464 is described below

commit 870a134e464d995d3ca2bd1fcc98b6e005b69c8a
Author: wudi <[email protected]>
AuthorDate: Mon Oct 27 18:47:58 2025 +0800

    [Fix](job) fix drop db error and duplicate fragment_id when retry (#57307)
    
    ### What problem does this PR solve?
    Fixed the following issues:
    1. In a cloud environment, if the database has been deleted, attempting
    to drop a job would result in an error.
    2. In a cloud environment, MS offsets were not successfully put, and
    statistics were inaccurate.
    3. When a StreamingTask timed out and retried, a duplicate framement
    error would be reported.
---
 .../insert/streaming/StreamingInsertJob.java       |  54 ++++++++---
 .../insert/streaming/StreamingInsertTask.java      | 100 +++++++++++----------
 .../streaming/StreamingJobSchedulerTask.java       |   7 ++
 .../org/apache/doris/job/manager/JobManager.java   |  21 ++---
 .../plans/commands/insert/OlapInsertExecutor.java  |  11 ++-
 .../streaming_job/test_streaming_insert_job.groovy |   2 +-
 .../test_streaming_insert_job_drop.groovy          |  97 ++++++++++++++++++++
 .../test_streaming_insert_job_task_retry.groovy    |  92 +++++++++++++++++++
 8 files changed, 317 insertions(+), 67 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index feb3254c546..c1c9a77b1c1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -89,7 +89,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 @Log4j2
 public class StreamingInsertJob extends AbstractJob<StreamingJobSchedulerTask, 
Map<Object, Object>> implements
         TxnStateChangeCallback, GsonPostProcessable {
-    private final long dbId;
+    private long dbId;
     private StreamingJobStatistic jobStatistic = new StreamingJobStatistic();
     @Getter
     @Setter
@@ -129,7 +129,6 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
             Map<String, String> properties) {
         super(Env.getCurrentEnv().getNextId(), jobName, jobStatus, dbName, 
comment, createUser,
                 jobConfig, createTimeMs, executeSql);
-        this.dbId = ConnectContext.get().getCurrentDbId();
         this.properties = properties;
         init();
     }
@@ -246,11 +245,12 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         try {
             super.updateJobStatus(status);
             if (JobStatus.PAUSED.equals(getJobStatus())) {
-                clearRunningStreamTask();
+                clearRunningStreamTask(status);
             }
             if (isFinalStatus()) {
                 
Env.getCurrentGlobalTransactionMgr().getCallbackFactory().removeCallback(getJobId());
             }
+            log.info("Streaming insert job {} update status to {}", 
getJobId(), getJobStatus());
         } finally {
             lock.writeLock().unlock();
         }
@@ -296,6 +296,8 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
                 offsetProvider, getCurrentDbName(), jobProperties, 
originTvfProps, getCreateUser());
         
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().registerTask(runningStreamTask);
         this.runningStreamTask.setStatus(TaskStatus.PENDING);
+        log.info("create new streaming insert task for job {}, task {} ",
+                getJobId(), runningStreamTask.getTaskId());
         return runningStreamTask;
     }
 
@@ -314,10 +316,12 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         return (getJobStatus().equals(JobStatus.RUNNING) || 
getJobStatus().equals(JobStatus.PENDING));
     }
 
-    public void clearRunningStreamTask() {
+    public void clearRunningStreamTask(JobStatus newJobStatus) {
         if (runningStreamTask != null) {
+            log.info("clear running streaming insert task for job {}, task {}, 
status {} ",
+                    getJobId(), runningStreamTask.getTaskId(), 
runningStreamTask.getStatus());
+            runningStreamTask.cancel(JobStatus.STOPPED.equals(newJobStatus) ? 
false : true);
             runningStreamTask.closeOrReleaseResources();
-            runningStreamTask = null;
         }
     }
 
@@ -325,7 +329,6 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         return offsetProvider.hasMoreDataToConsume();
     }
 
-
     @Override
     public void onTaskFail(StreamingJobSchedulerTask task) throws JobException 
{
         if (task.getErrMsg() != null) {
@@ -358,6 +361,8 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
             
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task);
             StreamingInsertTask nextTask = createStreamingInsertTask();
             this.runningStreamTask = nextTask;
+            log.info("Streaming insert job {} create next streaming insert 
task {} after task {} success",
+                    getJobId(), nextTask.getTaskId(), task.getTaskId());
         } finally {
             writeUnlock();
         }
@@ -374,6 +379,17 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         
offsetProvider.updateOffset(offsetProvider.deserializeOffset(attachment.getOffset()));
     }
 
+    private void 
updateCloudJobStatisticAndOffset(StreamingTaskTxnCommitAttachment attachment) {
+        if (this.jobStatistic == null) {
+            this.jobStatistic = new StreamingJobStatistic();
+        }
+        this.jobStatistic.setScannedRows(attachment.getScannedRows());
+        this.jobStatistic.setLoadBytes(attachment.getLoadBytes());
+        this.jobStatistic.setFileNumber(attachment.getNumFiles());
+        this.jobStatistic.setFileSize(attachment.getFileBytes());
+        
offsetProvider.updateOffset(offsetProvider.deserializeOffset(attachment.getOffset()));
+    }
+
     @Override
     public void onRegister() throws JobException {
         
Env.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(this);
@@ -555,6 +571,12 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         boolean shouldReleaseLock = false;
         writeLock();
         try {
+            if (runningStreamTask.getIsCanceled().get()) {
+                log.info("streaming insert job {} task {} is canceled, skip 
beforeCommitted",
+                        getJobId(), runningStreamTask.getTaskId());
+                return;
+            }
+
             ArrayList<Long> taskIds = new ArrayList<>();
             taskIds.add(runningStreamTask.getTaskId());
             // todo: Check whether the taskid of runningtask is consistent 
with the taskid associated with txn
@@ -601,14 +623,26 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         succeedTaskCount.incrementAndGet();
     }
 
+    public long getDbId() {
+        if (dbId <= 0) {
+            try {
+                this.dbId = 
Env.getCurrentInternalCatalog().getDbOrAnalysisException(getCurrentDbName()).getId();
+            } catch (AnalysisException e) {
+                log.warn("failed to get db id for streaming insert job {}, db 
name: {}, msg: {}",
+                        getJobId(), getCurrentDbName(), e.getMessage());
+            }
+        }
+        return dbId;
+    }
+
     public void replayOnCloudMode() throws JobException {
         Cloud.GetStreamingTaskCommitAttachRequest.Builder builder =
                 Cloud.GetStreamingTaskCommitAttachRequest.newBuilder();
         builder.setCloudUniqueId(Config.cloud_unique_id);
-        builder.setDbId(dbId);
+        builder.setDbId(getDbId());
         builder.setJobId(getJobId());
 
-        Cloud.GetStreamingTaskCommitAttachResponse response;
+        Cloud.GetStreamingTaskCommitAttachResponse response = null;
         try {
             response = 
MetaServiceProxy.getInstance().getStreamingTaskCommitAttach(builder.build());
             if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
@@ -621,13 +655,13 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
                 }
             }
         } catch (RpcException e) {
-            log.info("failed to get streaming task commit attach {}", e);
+            log.info("failed to get streaming task commit attach {}", 
response, e);
             throw new JobException(e.getMessage());
         }
 
         StreamingTaskTxnCommitAttachment commitAttach =
                 new 
StreamingTaskTxnCommitAttachment(response.getCommitAttach());
-        updateJobStatisticAndOffset(commitAttach);
+        updateCloudJobStatisticAndOffset(commitAttach);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
index ef127b9ca98..d9fa4b918bb 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
@@ -27,6 +27,7 @@ import org.apache.doris.job.exception.JobException;
 import org.apache.doris.job.extensions.insert.InsertTask;
 import org.apache.doris.job.offset.Offset;
 import org.apache.doris.job.offset.SourceOffsetProvider;
+import org.apache.doris.job.offset.s3.S3Offset;
 import org.apache.doris.nereids.StatementContext;
 import org.apache.doris.nereids.glue.LogicalPlanAdapter;
 import org.apache.doris.nereids.parser.NereidsParser;
@@ -67,10 +68,12 @@ public class StreamingInsertTask {
     private UserIdentity userIdentity;
     private ConnectContext ctx;
     private Offset runningOffset;
+    @Getter
     private AtomicBoolean isCanceled = new AtomicBoolean(false);
     private StreamingJobProperties jobProperties;
     private Map<String, String> originTvfProps;
     SourceOffsetProvider offsetProvider;
+    private int retryCount = 0;
 
     public StreamingInsertTask(long jobId,
                                long taskId,
@@ -93,22 +96,32 @@ public class StreamingInsertTask {
     }
 
     public void execute() throws JobException {
-        try {
-            before();
-            run();
-            onSuccess();
-        } catch (Exception e) {
-            if (TaskStatus.CANCELED.equals(status)) {
+        while (retryCount <= MAX_RETRY) {
+            try {
+                before();
+                run();
+                onSuccess();
                 return;
-            }
-            log.warn("execute task error, job id is {}, task id is {}", jobId, 
taskId, e);
-            onFail(e.getMessage());
-        } finally {
-            // The cancel logic will call the closeOrReleased Resources method 
by itself.
-            // If it is also called here,
-            // it may result in the inability to obtain relevant information 
when canceling the task
-            if (!TaskStatus.CANCELED.equals(status)) {
-                closeOrReleaseResources();
+            } catch (Exception e) {
+                if (TaskStatus.CANCELED.equals(status)) {
+                    return;
+                }
+                this.errMsg = e.getMessage();
+                retryCount++;
+                if (retryCount > MAX_RETRY) {
+                    log.error("Task execution failed after {} retries.", 
MAX_RETRY, e);
+                    onFail(e.getMessage());
+                    return;
+                }
+                log.warn("execute streaming task error, job id is {}, task id 
is {}, retrying {}/{}: {}",
+                        jobId, taskId, retryCount, MAX_RETRY, e.getMessage());
+            } finally {
+                // The cancel logic will call the closeOrReleased Resources 
method by itself.
+                // If it is also called here,
+                // it may result in the inability to obtain relevant 
information when canceling the task
+                if (!TaskStatus.CANCELED.equals(status)) {
+                    closeOrReleaseResources();
+                }
             }
         }
     }
@@ -118,7 +131,8 @@ public class StreamingInsertTask {
         this.startTimeMs = System.currentTimeMillis();
 
         if (isCanceled.get()) {
-            throw new JobException("Streaming insert task has been canceled, 
task id: {}", getTaskId());
+            log.info("streaming insert task has been canceled, task id is {}", 
getTaskId());
+            return;
         }
         ctx = InsertTask.makeConnectContext(userIdentity, currentDb);
         ctx.setSessionVariable(jobProperties.getSessionVariable());
@@ -135,42 +149,36 @@ public class StreamingInsertTask {
             throw new JobException("Can not get Parsed plan");
         }
         this.taskCommand = offsetProvider.rewriteTvfParams(baseCommand, 
runningOffset);
-        this.taskCommand.setLabelName(Optional.of(getJobId() + LABEL_SPLITTER 
+ getTaskId()));
+        this.taskCommand.setLabelName(Optional.of(labelName));
         this.stmtExecutor = new StmtExecutor(ctx, new 
LogicalPlanAdapter(taskCommand, ctx.getStatementContext()));
     }
 
     private void run() throws JobException {
+        StreamingInsertJob job =
+                (StreamingInsertJob) 
Env.getCurrentEnv().getJobManager().getJob(getJobId());
+        StreamingInsertTask runningStreamTask = job.getRunningStreamTask();
+        log.info("current running stream task id is {} for job id {}",
+                runningStreamTask == null ? -1 : 
runningStreamTask.getTaskId(), getJobId());
+        if (isCanceled.get()) {
+            log.info("task has been canceled, task id is {}", getTaskId());
+            return;
+        }
+        log.info("start to run streaming insert task, label {}, offset is {}, 
filepath {}",
+                labelName, runningOffset.toString(), ((S3Offset) 
runningOffset).getFileLists());
         String errMsg = null;
-        int retry = 0;
-        while (retry <= MAX_RETRY) {
-            try {
-                if (isCanceled.get()) {
-                    log.info("task has been canceled, task id is {}", 
getTaskId());
-                    return;
-                }
-                taskCommand.run(ctx, stmtExecutor);
-                if (ctx.getState().getStateType() == 
QueryState.MysqlStateType.OK) {
-                    return;
-                } else {
-                    errMsg = ctx.getState().getErrorMessage();
-                }
-                log.error(
-                        "streaming insert failed with {}, reason {}, to retry",
-                        taskCommand.getLabelName(),
-                        errMsg);
-                if (retry == MAX_RETRY) {
-                    errMsg = "reached max retry times, failed with " + errMsg;
-                }
-            } catch (Exception e) {
-                log.warn("execute insert task error, label is {},offset is 
{}", taskCommand.getLabelName(),
-                         runningOffset.toString(), e);
-                errMsg = Util.getRootCauseMessage(e);
+        try {
+            taskCommand.run(ctx, stmtExecutor);
+            if (ctx.getState().getStateType() == QueryState.MysqlStateType.OK) 
{
+                return;
+            } else {
+                errMsg = ctx.getState().getErrorMessage();
             }
-            retry++;
+            throw new JobException(errMsg);
+        } catch (Exception e) {
+            log.warn("execute insert task error, label is {},offset is {}", 
taskCommand.getLabelName(),
+                    runningOffset.toString(), e);
+            throw new JobException(Util.getRootCauseMessage(e));
         }
-        log.error("streaming insert task failed, job id is {}, task id is {}, 
offset is {}, errMsg is {}",
-                getJobId(), getTaskId(), runningOffset.toString(), errMsg);
-        throw new JobException(errMsg);
     }
 
     public boolean onSuccess() throws JobException {
@@ -218,6 +226,8 @@ public class StreamingInsertTask {
         }
         isCanceled.getAndSet(true);
         if (null != stmtExecutor) {
+            log.info("cancelling streaming insert task, job id is {}, task id 
is {}",
+                    getJobId(), getTaskId());
             stmtExecutor.cancel(new Status(TStatusCode.CANCELLED, "streaming 
insert task cancelled"),
                     needWaitCancelComplete);
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
index 503bc9e631f..a7a26596e62 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
@@ -30,11 +30,13 @@ import org.apache.doris.load.loadv2.LoadJob;
 import org.apache.doris.thrift.TCell;
 import org.apache.doris.thrift.TRow;
 
+import lombok.extern.log4j.Log4j2;
 import org.apache.commons.lang3.StringUtils;
 
 import java.util.Arrays;
 import java.util.List;
 
+@Log4j2
 public class StreamingJobSchedulerTask extends AbstractTask {
     private static final long BACK_OFF_BASIC_TIME_SEC = 10L;
     private static final long MAX_BACK_OFF_TIME_SEC = 60 * 5;
@@ -108,6 +110,7 @@ public class StreamingJobSchedulerTask extends AbstractTask 
{
 
     @Override
     protected void executeCancelLogic(boolean needWaitCancelComplete) throws 
Exception {
+        log.info("cancelling streaming insert job scheduler task for job id 
{}", streamingInsertJob.getJobId());
         if (streamingInsertJob.getRunningStreamTask() != null) {
             
streamingInsertJob.getRunningStreamTask().cancel(needWaitCancelComplete);
         }
@@ -119,6 +122,10 @@ public class StreamingJobSchedulerTask extends 
AbstractTask {
         if (runningTask == null) {
             return null;
         }
+        if (!streamingInsertJob.needScheduleTask()) {
+            //todo: should list history task
+            return null;
+        }
         TRow trow = new TRow();
         trow.addToColumnValue(new 
TCell().setStringVal(String.valueOf(runningTask.getTaskId())));
         trow.addToColumnValue(new 
TCell().setStringVal(String.valueOf(runningTask.getJobId())));
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
index 21b84ffaf65..95c38e0fa06 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
@@ -51,6 +51,7 @@ import org.apache.doris.nereids.trees.expressions.And;
 import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.plans.commands.AlterJobCommand;
 import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.rpc.RpcException;
 
 import com.google.common.collect.Lists;
 import lombok.Getter;
@@ -241,25 +242,25 @@ public class JobManager<T extends AbstractJob<?, C>, C> 
implements Writable {
     }
 
     private void deleteStremingJob(AbstractJob<?, C> job) throws JobException {
-        boolean isStreamingJob = 
job.getJobConfig().getExecuteType().equals(JobExecuteType.STREAMING);
-        if (!(Config.isCloudMode() && isStreamingJob)) {
+        if (!(Config.isCloudMode() && job instanceof StreamingInsertJob)) {
             return;
         }
+        StreamingInsertJob streamingJob = (StreamingInsertJob) job;
+        Cloud.DeleteStreamingJobResponse resp = null;
         try {
-            long dbId = 
Env.getCurrentInternalCatalog().getDbOrDdlException(job.getCurrentDbName()).getId();
             Cloud.DeleteStreamingJobRequest req = 
Cloud.DeleteStreamingJobRequest.newBuilder()
                     .setCloudUniqueId(Config.cloud_unique_id)
-                    .setDbId(dbId)
+                    .setDbId(streamingJob.getDbId())
                     .setJobId(job.getJobId())
                     .build();
-            Cloud.DeleteStreamingJobResponse resp = 
MetaServiceProxy.getInstance().deleteStreamingJob(req);
+            resp = MetaServiceProxy.getInstance().deleteStreamingJob(req);
             if (resp.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
-                throw new JobException("deleteJobKey failed for jobId={}, 
dbId={}, status={}",
-                        job.getJobId(), dbId, resp.getStatus());
+                log.warn("failed to delete streaming job, response: {}", resp);
+                throw new JobException("deleteJobKey failed for jobId=%s, 
dbId=%s, status=%s",
+                        job.getJobId(), job.getJobId(), resp.getStatus());
             }
-        } catch (Exception e) {
-            throw new JobException("deleteJobKey exception for jobId={}, 
dbName={}",
-                    job.getJobId(), job.getCurrentDbName(), e);
+        } catch (RpcException e) {
+            log.warn("failed to delete streaming job {}", resp, e);
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
index de717102ea7..7fceb289ef1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
@@ -102,12 +102,21 @@ public class OlapInsertExecutor extends 
AbstractInsertExecutor {
             if 
(DebugPointUtil.isEnable("OlapInsertExecutor.beginTransaction.failed")) {
                 throw new BeginTransactionException("current running txns on 
db is larger than limit");
             }
+            LoadJobSourceType loadJobSourceType = 
LoadJobSourceType.INSERT_STREAMING;
+            StreamingInsertTask streamingInsertTask = Env.getCurrentEnv()
+                    .getJobManager()
+                    .getStreamingTaskManager()
+                    .getStreamingInsertTaskById(jobId);
+
+            if (streamingInsertTask != null) {
+                loadJobSourceType = LoadJobSourceType.STREAMING_JOB;
+            }
             this.txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
                     database.getId(), ImmutableList.of(table.getId()), 
labelName,
                     new TxnCoordinator(TxnSourceType.FE, 0,
                             FrontendOptions.getLocalHostAddress(),
                             ExecuteEnv.getInstance().getStartupTime()),
-                    LoadJobSourceType.INSERT_STREAMING, ctx.getExecTimeoutS());
+                    loadJobSourceType, ctx.getExecTimeoutS());
         } catch (Exception e) {
             throw new AnalysisException("begin transaction failed. " + 
e.getMessage(), e);
         }
diff --git 
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy 
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
index 66c46a5e57d..93dc64dfd4e 100644
--- 
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
+++ 
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
@@ -65,7 +65,7 @@ suite("test_streaming_insert_job") {
         Awaitility.await().atMost(300, SECONDS)
                 .pollInterval(1, SECONDS).until(
                 {
-                    def jobSuccendCount = sql """ select SucceedTaskCount from 
jobs("type"="insert") where Name like '%${jobName}%' and 
ExecuteType='STREAMING' """
+                    def jobSuccendCount = sql """ select SucceedTaskCount from 
jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """
                     log.info("jobSuccendCount: " + jobSuccendCount)
                     // check job status and succeed task count larger than 2
                     jobSuccendCount.size() == 1 && '2' <= 
jobSuccendCount.get(0).get(0)
diff --git 
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_drop.groovy
 
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_drop.groovy
new file mode 100644
index 00000000000..4d96102a1a3
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_drop.groovy
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_insert_job_drop") {
+    def tableName = "test_streaming_insert_job_tbl_drop"
+    def jobName = "test_streaming_insert_job_name_drop"
+
+    sql """drop table if exists `${tableName}` force"""
+    sql """
+        DROP JOB IF EXISTS where jobname =  '${jobName}'
+    """
+    def dropDatabase = "test_streaming_insert_job_drop_db"
+    sql """create database if not exists ${dropDatabase}"""
+    sql """use ${dropDatabase}"""
+
+    sql """
+        CREATE TABLE IF NOT EXISTS ${tableName} (
+            `c1` int NULL,
+            `c2` string NULL,
+            `c3` int  NULL,
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`c1`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(`c1`) BUCKETS 3
+        PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+    """
+
+    // create streaming job
+    sql """
+       CREATE JOB ${jobName}  
+       PROPERTIES(
+        "s3.max_batch_files" = "1"
+       )
+       ON STREAMING DO INSERT INTO ${tableName} 
+       SELECT * FROM S3
+        (
+            "uri" = 
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
+            "format" = "csv",
+            "provider" = "${getS3Provider()}",
+            "column_separator" = ",",
+            "s3.endpoint" = "${getS3Endpoint()}",
+            "s3.region" = "${getS3Region()}",
+            "s3.access_key" = "${getS3AK()}",
+            "s3.secret_key" = "${getS3SK()}"
+        );
+    """
+    try {
+        Awaitility.await().atMost(300, SECONDS)
+                .pollInterval(1, SECONDS).until(
+                {
+                    def jobSuccendCount = sql """ select SucceedTaskCount from 
jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """
+                    log.info("jobSuccendCount: " + jobSuccendCount)
+                    // check job status and succeed task count larger than 2
+                    jobSuccendCount.size() == 1 && '2' <= 
jobSuccendCount.get(0).get(0)
+                }
+        )
+    } catch (Exception ex){
+        def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+        def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+        log.info("show job: " + showjob)
+        log.info("show task: " + showtask)
+        throw ex;
+    }
+
+    def jobResult = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+    log.info("show success job: " + jobResult)
+
+    // drop dataabse
+    sql """drop database ${dropDatabase} force"""
+
+    // drop job
+    sql """
+        DROP JOB IF EXISTS where jobname = '${jobName}'
+    """
+
+    def jobCountRsp = sql """select count(1) from jobs("type"="insert")  where 
Name ='${jobName}'"""
+    assert jobCountRsp.get(0).get(0) == 0
+}
diff --git 
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_task_retry.groovy
 
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_task_retry.groovy
new file mode 100644
index 00000000000..0a8c47448ee
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_task_retry.groovy
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_insert_job_task_retry", 'nonConcurrent') {
+    def tableName = "test_streaming_insert_job_tbl_task_retry"
+    def jobName = "test_streaming_insert_job_name_task_retry"
+
+    sql """drop table if exists `${tableName}` force"""
+    sql """
+        DROP JOB IF EXISTS where jobname =  '${jobName}'
+    """
+
+    sql """
+        CREATE TABLE IF NOT EXISTS ${tableName} (
+            `c1` int NULL,
+            `c2` string NULL,
+            `c3` int  NULL,
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`c1`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(`c1`) BUCKETS 3
+        PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+    """
+
+    // create streaming job
+    sql """
+       CREATE JOB ${jobName}  
+       PROPERTIES(
+        "session.insert_timeout" = "1",
+        "session.query_timeout" = "1"
+       )
+       ON STREAMING DO INSERT INTO ${tableName} 
+       SELECT c1, c2, sleep(10) FROM S3
+        (
+            "uri" = 
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
+            "format" = "csv",
+            "provider" = "${getS3Provider()}",
+            "column_separator" = ",",
+            "s3.endpoint" = "${getS3Endpoint()}",
+            "s3.region" = "${getS3Region()}",
+            "s3.access_key" = "${getS3AK()}",
+            "s3.secret_key" = "${getS3SK()}"
+        );
+    """
+    try {
+        Awaitility.await().atMost(300, SECONDS)
+                .pollInterval(1, SECONDS).until(
+                {
+                    def jobStatus = sql """ select Status,FailedTaskCount from 
jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """
+                    log.info("jobStatus: " + jobStatus)
+                    jobStatus.size() == 1 && 'PAUSED' == 
jobStatus.get(0).get(0) && '1' == jobStatus.get(0).get(1)
+                }
+        )
+    } catch (Exception ex){
+        def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+        def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+        log.info("show job: " + showjob)
+        log.info("show task: " + showtask)
+        throw ex;
+    }
+
+    def jobErrorMsg = sql """select ErrorMsg from jobs("type"="insert") where 
Name='${jobName}'"""
+    log.info("jobErrorMsg: " + jobErrorMsg)
+    assert jobErrorMsg.get(0).get(0).contains("Execute timeout")
+
+    // drop job
+    sql """
+        DROP JOB IF EXISTS where jobname = '${jobName}'
+    """
+
+    def jobCountRsp = sql """select count(1) from jobs("type"="insert")  where 
Name ='${jobName}'"""
+    assert jobCountRsp.get(0).get(0) == 0
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to