This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 870a134e464 [Fix](job) fix drop db error and duplicate fragment_id
when retry (#57307)
870a134e464 is described below
commit 870a134e464d995d3ca2bd1fcc98b6e005b69c8a
Author: wudi <[email protected]>
AuthorDate: Mon Oct 27 18:47:58 2025 +0800
[Fix](job) fix drop db error and duplicate fragment_id when retry (#57307)
### What problem does this PR solve?
Fixed the following issues:
1. In a cloud environment, if the database has been deleted, attempting
to drop a job would result in an error.
2. In a cloud environment, MS offsets were not successfully put, and
statistics were inaccurate.
3. When a StreamingTask timed out and retried, a duplicate framement
error would be reported.
---
.../insert/streaming/StreamingInsertJob.java | 54 ++++++++---
.../insert/streaming/StreamingInsertTask.java | 100 +++++++++++----------
.../streaming/StreamingJobSchedulerTask.java | 7 ++
.../org/apache/doris/job/manager/JobManager.java | 21 ++---
.../plans/commands/insert/OlapInsertExecutor.java | 11 ++-
.../streaming_job/test_streaming_insert_job.groovy | 2 +-
.../test_streaming_insert_job_drop.groovy | 97 ++++++++++++++++++++
.../test_streaming_insert_job_task_retry.groovy | 92 +++++++++++++++++++
8 files changed, 317 insertions(+), 67 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index feb3254c546..c1c9a77b1c1 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -89,7 +89,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
@Log4j2
public class StreamingInsertJob extends AbstractJob<StreamingJobSchedulerTask,
Map<Object, Object>> implements
TxnStateChangeCallback, GsonPostProcessable {
- private final long dbId;
+ private long dbId;
private StreamingJobStatistic jobStatistic = new StreamingJobStatistic();
@Getter
@Setter
@@ -129,7 +129,6 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
Map<String, String> properties) {
super(Env.getCurrentEnv().getNextId(), jobName, jobStatus, dbName,
comment, createUser,
jobConfig, createTimeMs, executeSql);
- this.dbId = ConnectContext.get().getCurrentDbId();
this.properties = properties;
init();
}
@@ -246,11 +245,12 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
try {
super.updateJobStatus(status);
if (JobStatus.PAUSED.equals(getJobStatus())) {
- clearRunningStreamTask();
+ clearRunningStreamTask(status);
}
if (isFinalStatus()) {
Env.getCurrentGlobalTransactionMgr().getCallbackFactory().removeCallback(getJobId());
}
+ log.info("Streaming insert job {} update status to {}",
getJobId(), getJobStatus());
} finally {
lock.writeLock().unlock();
}
@@ -296,6 +296,8 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
offsetProvider, getCurrentDbName(), jobProperties,
originTvfProps, getCreateUser());
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().registerTask(runningStreamTask);
this.runningStreamTask.setStatus(TaskStatus.PENDING);
+ log.info("create new streaming insert task for job {}, task {} ",
+ getJobId(), runningStreamTask.getTaskId());
return runningStreamTask;
}
@@ -314,10 +316,12 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
return (getJobStatus().equals(JobStatus.RUNNING) ||
getJobStatus().equals(JobStatus.PENDING));
}
- public void clearRunningStreamTask() {
+ public void clearRunningStreamTask(JobStatus newJobStatus) {
if (runningStreamTask != null) {
+ log.info("clear running streaming insert task for job {}, task {},
status {} ",
+ getJobId(), runningStreamTask.getTaskId(),
runningStreamTask.getStatus());
+ runningStreamTask.cancel(JobStatus.STOPPED.equals(newJobStatus) ?
false : true);
runningStreamTask.closeOrReleaseResources();
- runningStreamTask = null;
}
}
@@ -325,7 +329,6 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
return offsetProvider.hasMoreDataToConsume();
}
-
@Override
public void onTaskFail(StreamingJobSchedulerTask task) throws JobException
{
if (task.getErrMsg() != null) {
@@ -358,6 +361,8 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task);
StreamingInsertTask nextTask = createStreamingInsertTask();
this.runningStreamTask = nextTask;
+ log.info("Streaming insert job {} create next streaming insert
task {} after task {} success",
+ getJobId(), nextTask.getTaskId(), task.getTaskId());
} finally {
writeUnlock();
}
@@ -374,6 +379,17 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
offsetProvider.updateOffset(offsetProvider.deserializeOffset(attachment.getOffset()));
}
+ private void
updateCloudJobStatisticAndOffset(StreamingTaskTxnCommitAttachment attachment) {
+ if (this.jobStatistic == null) {
+ this.jobStatistic = new StreamingJobStatistic();
+ }
+ this.jobStatistic.setScannedRows(attachment.getScannedRows());
+ this.jobStatistic.setLoadBytes(attachment.getLoadBytes());
+ this.jobStatistic.setFileNumber(attachment.getNumFiles());
+ this.jobStatistic.setFileSize(attachment.getFileBytes());
+
offsetProvider.updateOffset(offsetProvider.deserializeOffset(attachment.getOffset()));
+ }
+
@Override
public void onRegister() throws JobException {
Env.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(this);
@@ -555,6 +571,12 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
boolean shouldReleaseLock = false;
writeLock();
try {
+ if (runningStreamTask.getIsCanceled().get()) {
+ log.info("streaming insert job {} task {} is canceled, skip
beforeCommitted",
+ getJobId(), runningStreamTask.getTaskId());
+ return;
+ }
+
ArrayList<Long> taskIds = new ArrayList<>();
taskIds.add(runningStreamTask.getTaskId());
// todo: Check whether the taskid of runningtask is consistent
with the taskid associated with txn
@@ -601,14 +623,26 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
succeedTaskCount.incrementAndGet();
}
+ public long getDbId() {
+ if (dbId <= 0) {
+ try {
+ this.dbId =
Env.getCurrentInternalCatalog().getDbOrAnalysisException(getCurrentDbName()).getId();
+ } catch (AnalysisException e) {
+ log.warn("failed to get db id for streaming insert job {}, db
name: {}, msg: {}",
+ getJobId(), getCurrentDbName(), e.getMessage());
+ }
+ }
+ return dbId;
+ }
+
public void replayOnCloudMode() throws JobException {
Cloud.GetStreamingTaskCommitAttachRequest.Builder builder =
Cloud.GetStreamingTaskCommitAttachRequest.newBuilder();
builder.setCloudUniqueId(Config.cloud_unique_id);
- builder.setDbId(dbId);
+ builder.setDbId(getDbId());
builder.setJobId(getJobId());
- Cloud.GetStreamingTaskCommitAttachResponse response;
+ Cloud.GetStreamingTaskCommitAttachResponse response = null;
try {
response =
MetaServiceProxy.getInstance().getStreamingTaskCommitAttach(builder.build());
if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
@@ -621,13 +655,13 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
}
}
} catch (RpcException e) {
- log.info("failed to get streaming task commit attach {}", e);
+ log.info("failed to get streaming task commit attach {}",
response, e);
throw new JobException(e.getMessage());
}
StreamingTaskTxnCommitAttachment commitAttach =
new
StreamingTaskTxnCommitAttachment(response.getCommitAttach());
- updateJobStatisticAndOffset(commitAttach);
+ updateCloudJobStatisticAndOffset(commitAttach);
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
index ef127b9ca98..d9fa4b918bb 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
@@ -27,6 +27,7 @@ import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.extensions.insert.InsertTask;
import org.apache.doris.job.offset.Offset;
import org.apache.doris.job.offset.SourceOffsetProvider;
+import org.apache.doris.job.offset.s3.S3Offset;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.parser.NereidsParser;
@@ -67,10 +68,12 @@ public class StreamingInsertTask {
private UserIdentity userIdentity;
private ConnectContext ctx;
private Offset runningOffset;
+ @Getter
private AtomicBoolean isCanceled = new AtomicBoolean(false);
private StreamingJobProperties jobProperties;
private Map<String, String> originTvfProps;
SourceOffsetProvider offsetProvider;
+ private int retryCount = 0;
public StreamingInsertTask(long jobId,
long taskId,
@@ -93,22 +96,32 @@ public class StreamingInsertTask {
}
public void execute() throws JobException {
- try {
- before();
- run();
- onSuccess();
- } catch (Exception e) {
- if (TaskStatus.CANCELED.equals(status)) {
+ while (retryCount <= MAX_RETRY) {
+ try {
+ before();
+ run();
+ onSuccess();
return;
- }
- log.warn("execute task error, job id is {}, task id is {}", jobId,
taskId, e);
- onFail(e.getMessage());
- } finally {
- // The cancel logic will call the closeOrReleased Resources method
by itself.
- // If it is also called here,
- // it may result in the inability to obtain relevant information
when canceling the task
- if (!TaskStatus.CANCELED.equals(status)) {
- closeOrReleaseResources();
+ } catch (Exception e) {
+ if (TaskStatus.CANCELED.equals(status)) {
+ return;
+ }
+ this.errMsg = e.getMessage();
+ retryCount++;
+ if (retryCount > MAX_RETRY) {
+ log.error("Task execution failed after {} retries.",
MAX_RETRY, e);
+ onFail(e.getMessage());
+ return;
+ }
+ log.warn("execute streaming task error, job id is {}, task id
is {}, retrying {}/{}: {}",
+ jobId, taskId, retryCount, MAX_RETRY, e.getMessage());
+ } finally {
+ // The cancel logic will call the closeOrReleased Resources
method by itself.
+ // If it is also called here,
+ // it may result in the inability to obtain relevant
information when canceling the task
+ if (!TaskStatus.CANCELED.equals(status)) {
+ closeOrReleaseResources();
+ }
}
}
}
@@ -118,7 +131,8 @@ public class StreamingInsertTask {
this.startTimeMs = System.currentTimeMillis();
if (isCanceled.get()) {
- throw new JobException("Streaming insert task has been canceled,
task id: {}", getTaskId());
+ log.info("streaming insert task has been canceled, task id is {}",
getTaskId());
+ return;
}
ctx = InsertTask.makeConnectContext(userIdentity, currentDb);
ctx.setSessionVariable(jobProperties.getSessionVariable());
@@ -135,42 +149,36 @@ public class StreamingInsertTask {
throw new JobException("Can not get Parsed plan");
}
this.taskCommand = offsetProvider.rewriteTvfParams(baseCommand,
runningOffset);
- this.taskCommand.setLabelName(Optional.of(getJobId() + LABEL_SPLITTER
+ getTaskId()));
+ this.taskCommand.setLabelName(Optional.of(labelName));
this.stmtExecutor = new StmtExecutor(ctx, new
LogicalPlanAdapter(taskCommand, ctx.getStatementContext()));
}
private void run() throws JobException {
+ StreamingInsertJob job =
+ (StreamingInsertJob)
Env.getCurrentEnv().getJobManager().getJob(getJobId());
+ StreamingInsertTask runningStreamTask = job.getRunningStreamTask();
+ log.info("current running stream task id is {} for job id {}",
+ runningStreamTask == null ? -1 :
runningStreamTask.getTaskId(), getJobId());
+ if (isCanceled.get()) {
+ log.info("task has been canceled, task id is {}", getTaskId());
+ return;
+ }
+ log.info("start to run streaming insert task, label {}, offset is {},
filepath {}",
+ labelName, runningOffset.toString(), ((S3Offset)
runningOffset).getFileLists());
String errMsg = null;
- int retry = 0;
- while (retry <= MAX_RETRY) {
- try {
- if (isCanceled.get()) {
- log.info("task has been canceled, task id is {}",
getTaskId());
- return;
- }
- taskCommand.run(ctx, stmtExecutor);
- if (ctx.getState().getStateType() ==
QueryState.MysqlStateType.OK) {
- return;
- } else {
- errMsg = ctx.getState().getErrorMessage();
- }
- log.error(
- "streaming insert failed with {}, reason {}, to retry",
- taskCommand.getLabelName(),
- errMsg);
- if (retry == MAX_RETRY) {
- errMsg = "reached max retry times, failed with " + errMsg;
- }
- } catch (Exception e) {
- log.warn("execute insert task error, label is {},offset is
{}", taskCommand.getLabelName(),
- runningOffset.toString(), e);
- errMsg = Util.getRootCauseMessage(e);
+ try {
+ taskCommand.run(ctx, stmtExecutor);
+ if (ctx.getState().getStateType() == QueryState.MysqlStateType.OK)
{
+ return;
+ } else {
+ errMsg = ctx.getState().getErrorMessage();
}
- retry++;
+ throw new JobException(errMsg);
+ } catch (Exception e) {
+ log.warn("execute insert task error, label is {},offset is {}",
taskCommand.getLabelName(),
+ runningOffset.toString(), e);
+ throw new JobException(Util.getRootCauseMessage(e));
}
- log.error("streaming insert task failed, job id is {}, task id is {},
offset is {}, errMsg is {}",
- getJobId(), getTaskId(), runningOffset.toString(), errMsg);
- throw new JobException(errMsg);
}
public boolean onSuccess() throws JobException {
@@ -218,6 +226,8 @@ public class StreamingInsertTask {
}
isCanceled.getAndSet(true);
if (null != stmtExecutor) {
+ log.info("cancelling streaming insert task, job id is {}, task id
is {}",
+ getJobId(), getTaskId());
stmtExecutor.cancel(new Status(TStatusCode.CANCELLED, "streaming
insert task cancelled"),
needWaitCancelComplete);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
index 503bc9e631f..a7a26596e62 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
@@ -30,11 +30,13 @@ import org.apache.doris.load.loadv2.LoadJob;
import org.apache.doris.thrift.TCell;
import org.apache.doris.thrift.TRow;
+import lombok.extern.log4j.Log4j2;
import org.apache.commons.lang3.StringUtils;
import java.util.Arrays;
import java.util.List;
+@Log4j2
public class StreamingJobSchedulerTask extends AbstractTask {
private static final long BACK_OFF_BASIC_TIME_SEC = 10L;
private static final long MAX_BACK_OFF_TIME_SEC = 60 * 5;
@@ -108,6 +110,7 @@ public class StreamingJobSchedulerTask extends AbstractTask
{
@Override
protected void executeCancelLogic(boolean needWaitCancelComplete) throws
Exception {
+ log.info("cancelling streaming insert job scheduler task for job id
{}", streamingInsertJob.getJobId());
if (streamingInsertJob.getRunningStreamTask() != null) {
streamingInsertJob.getRunningStreamTask().cancel(needWaitCancelComplete);
}
@@ -119,6 +122,10 @@ public class StreamingJobSchedulerTask extends
AbstractTask {
if (runningTask == null) {
return null;
}
+ if (!streamingInsertJob.needScheduleTask()) {
+ //todo: should list history task
+ return null;
+ }
TRow trow = new TRow();
trow.addToColumnValue(new
TCell().setStringVal(String.valueOf(runningTask.getTaskId())));
trow.addToColumnValue(new
TCell().setStringVal(String.valueOf(runningTask.getJobId())));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
index 21b84ffaf65..95c38e0fa06 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
@@ -51,6 +51,7 @@ import org.apache.doris.nereids.trees.expressions.And;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.plans.commands.AlterJobCommand;
import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.rpc.RpcException;
import com.google.common.collect.Lists;
import lombok.Getter;
@@ -241,25 +242,25 @@ public class JobManager<T extends AbstractJob<?, C>, C>
implements Writable {
}
private void deleteStremingJob(AbstractJob<?, C> job) throws JobException {
- boolean isStreamingJob =
job.getJobConfig().getExecuteType().equals(JobExecuteType.STREAMING);
- if (!(Config.isCloudMode() && isStreamingJob)) {
+ if (!(Config.isCloudMode() && job instanceof StreamingInsertJob)) {
return;
}
+ StreamingInsertJob streamingJob = (StreamingInsertJob) job;
+ Cloud.DeleteStreamingJobResponse resp = null;
try {
- long dbId =
Env.getCurrentInternalCatalog().getDbOrDdlException(job.getCurrentDbName()).getId();
Cloud.DeleteStreamingJobRequest req =
Cloud.DeleteStreamingJobRequest.newBuilder()
.setCloudUniqueId(Config.cloud_unique_id)
- .setDbId(dbId)
+ .setDbId(streamingJob.getDbId())
.setJobId(job.getJobId())
.build();
- Cloud.DeleteStreamingJobResponse resp =
MetaServiceProxy.getInstance().deleteStreamingJob(req);
+ resp = MetaServiceProxy.getInstance().deleteStreamingJob(req);
if (resp.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
- throw new JobException("deleteJobKey failed for jobId={},
dbId={}, status={}",
- job.getJobId(), dbId, resp.getStatus());
+ log.warn("failed to delete streaming job, response: {}", resp);
+ throw new JobException("deleteJobKey failed for jobId=%s,
dbId=%s, status=%s",
+ job.getJobId(), job.getJobId(), resp.getStatus());
}
- } catch (Exception e) {
- throw new JobException("deleteJobKey exception for jobId={},
dbName={}",
- job.getJobId(), job.getCurrentDbName(), e);
+ } catch (RpcException e) {
+ log.warn("failed to delete streaming job {}", resp, e);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
index de717102ea7..7fceb289ef1 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
@@ -102,12 +102,21 @@ public class OlapInsertExecutor extends
AbstractInsertExecutor {
if
(DebugPointUtil.isEnable("OlapInsertExecutor.beginTransaction.failed")) {
throw new BeginTransactionException("current running txns on
db is larger than limit");
}
+ LoadJobSourceType loadJobSourceType =
LoadJobSourceType.INSERT_STREAMING;
+ StreamingInsertTask streamingInsertTask = Env.getCurrentEnv()
+ .getJobManager()
+ .getStreamingTaskManager()
+ .getStreamingInsertTaskById(jobId);
+
+ if (streamingInsertTask != null) {
+ loadJobSourceType = LoadJobSourceType.STREAMING_JOB;
+ }
this.txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
database.getId(), ImmutableList.of(table.getId()),
labelName,
new TxnCoordinator(TxnSourceType.FE, 0,
FrontendOptions.getLocalHostAddress(),
ExecuteEnv.getInstance().getStartupTime()),
- LoadJobSourceType.INSERT_STREAMING, ctx.getExecTimeoutS());
+ loadJobSourceType, ctx.getExecTimeoutS());
} catch (Exception e) {
throw new AnalysisException("begin transaction failed. " +
e.getMessage(), e);
}
diff --git
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
index 66c46a5e57d..93dc64dfd4e 100644
---
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
+++
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
@@ -65,7 +65,7 @@ suite("test_streaming_insert_job") {
Awaitility.await().atMost(300, SECONDS)
.pollInterval(1, SECONDS).until(
{
- def jobSuccendCount = sql """ select SucceedTaskCount from
jobs("type"="insert") where Name like '%${jobName}%' and
ExecuteType='STREAMING' """
+ def jobSuccendCount = sql """ select SucceedTaskCount from
jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """
log.info("jobSuccendCount: " + jobSuccendCount)
// check job status and succeed task count larger than 2
jobSuccendCount.size() == 1 && '2' <=
jobSuccendCount.get(0).get(0)
diff --git
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_drop.groovy
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_drop.groovy
new file mode 100644
index 00000000000..4d96102a1a3
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_drop.groovy
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_insert_job_drop") {
+ def tableName = "test_streaming_insert_job_tbl_drop"
+ def jobName = "test_streaming_insert_job_name_drop"
+
+ sql """drop table if exists `${tableName}` force"""
+ sql """
+ DROP JOB IF EXISTS where jobname = '${jobName}'
+ """
+ def dropDatabase = "test_streaming_insert_job_drop_db"
+ sql """create database if not exists ${dropDatabase}"""
+ sql """use ${dropDatabase}"""
+
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `c1` int NULL,
+ `c2` string NULL,
+ `c3` int NULL,
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`c1`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`c1`) BUCKETS 3
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+
+ // create streaming job
+ sql """
+ CREATE JOB ${jobName}
+ PROPERTIES(
+ "s3.max_batch_files" = "1"
+ )
+ ON STREAMING DO INSERT INTO ${tableName}
+ SELECT * FROM S3
+ (
+ "uri" =
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
+ "format" = "csv",
+ "provider" = "${getS3Provider()}",
+ "column_separator" = ",",
+ "s3.endpoint" = "${getS3Endpoint()}",
+ "s3.region" = "${getS3Region()}",
+ "s3.access_key" = "${getS3AK()}",
+ "s3.secret_key" = "${getS3SK()}"
+ );
+ """
+ try {
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(1, SECONDS).until(
+ {
+ def jobSuccendCount = sql """ select SucceedTaskCount from
jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """
+ log.info("jobSuccendCount: " + jobSuccendCount)
+ // check job status and succeed task count larger than 2
+ jobSuccendCount.size() == 1 && '2' <=
jobSuccendCount.get(0).get(0)
+ }
+ )
+ } catch (Exception ex){
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex;
+ }
+
+ def jobResult = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ log.info("show success job: " + jobResult)
+
+ // drop dataabse
+ sql """drop database ${dropDatabase} force"""
+
+ // drop job
+ sql """
+ DROP JOB IF EXISTS where jobname = '${jobName}'
+ """
+
+ def jobCountRsp = sql """select count(1) from jobs("type"="insert") where
Name ='${jobName}'"""
+ assert jobCountRsp.get(0).get(0) == 0
+}
diff --git
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_task_retry.groovy
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_task_retry.groovy
new file mode 100644
index 00000000000..0a8c47448ee
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_task_retry.groovy
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_insert_job_task_retry", 'nonConcurrent') {
+ def tableName = "test_streaming_insert_job_tbl_task_retry"
+ def jobName = "test_streaming_insert_job_name_task_retry"
+
+ sql """drop table if exists `${tableName}` force"""
+ sql """
+ DROP JOB IF EXISTS where jobname = '${jobName}'
+ """
+
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `c1` int NULL,
+ `c2` string NULL,
+ `c3` int NULL,
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`c1`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`c1`) BUCKETS 3
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+
+ // create streaming job
+ sql """
+ CREATE JOB ${jobName}
+ PROPERTIES(
+ "session.insert_timeout" = "1",
+ "session.query_timeout" = "1"
+ )
+ ON STREAMING DO INSERT INTO ${tableName}
+ SELECT c1, c2, sleep(10) FROM S3
+ (
+ "uri" =
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
+ "format" = "csv",
+ "provider" = "${getS3Provider()}",
+ "column_separator" = ",",
+ "s3.endpoint" = "${getS3Endpoint()}",
+ "s3.region" = "${getS3Region()}",
+ "s3.access_key" = "${getS3AK()}",
+ "s3.secret_key" = "${getS3SK()}"
+ );
+ """
+ try {
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(1, SECONDS).until(
+ {
+ def jobStatus = sql """ select Status,FailedTaskCount from
jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """
+ log.info("jobStatus: " + jobStatus)
+ jobStatus.size() == 1 && 'PAUSED' ==
jobStatus.get(0).get(0) && '1' == jobStatus.get(0).get(1)
+ }
+ )
+ } catch (Exception ex){
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex;
+ }
+
+ def jobErrorMsg = sql """select ErrorMsg from jobs("type"="insert") where
Name='${jobName}'"""
+ log.info("jobErrorMsg: " + jobErrorMsg)
+ assert jobErrorMsg.get(0).get(0).contains("Execute timeout")
+
+ // drop job
+ sql """
+ DROP JOB IF EXISTS where jobname = '${jobName}'
+ """
+
+ def jobCountRsp = sql """select count(1) from jobs("type"="insert") where
Name ='${jobName}'"""
+ assert jobCountRsp.get(0).get(0) == 0
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]