This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new db7cb6c4f5a branch-4.0: [test](streaming job) add streaming job auto
resume test #56881 (#56894)
db7cb6c4f5a is described below
commit db7cb6c4f5a9068901c542a176a15766bc196416
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Oct 14 11:34:10 2025 +0800
branch-4.0: [test](streaming job) add streaming job auto resume test #56881
(#56894)
Cherry-picked from #56881
Co-authored-by: hui lai <[email protected]>
---
.../test_streaming_job_auto_resume.groovy | 113 +++++++++++++++++++++
1 file changed, 113 insertions(+)
diff --git
a/regression-test/suites/job_p0/streaming_job/test_streaming_job_auto_resume.groovy
b/regression-test/suites/job_p0/streaming_job/test_streaming_job_auto_resume.groovy
new file mode 100644
index 00000000000..8506d08f17b
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/test_streaming_job_auto_resume.groovy
@@ -0,0 +1,113 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_job_auto_resume", "nonConcurrent") {
+ def tableName = "test_streaming_job_auto_resume_tbl"
+ def jobName = "test_streaming_job_auto_resume"
+ sql """drop table if exists `${tableName}` force"""
+ sql """
+ DROP JOB IF EXISTS where jobname = '${jobName}'
+ """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `c1` int NULL,
+ `c2` string NULL,
+ `c3` int NULL,
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`c1`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`c1`) BUCKETS 3
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+
+ try {
+
GetDebugPoint().enableDebugPointForAllBEs("FlushToken.submit_flush_error")
+ sql """
+ CREATE JOB ${jobName}
+ PROPERTIES(
+ "s3.max_batch_files" = "1"
+ )
+ ON STREAMING DO INSERT INTO ${tableName}
+ SELECT * FROM S3
+ (
+ "uri" =
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
+ "format" = "csv",
+ "provider" = "${getS3Provider()}",
+ "column_separator" = ",",
+ "s3.endpoint" = "${getS3Endpoint()}",
+ "s3.region" = "${getS3Region()}",
+ "s3.access_key" = "${getS3AK()}",
+ "s3.secret_key" = "${getS3SK()}"
+ );
+ """
+ try {
+ Awaitility.await().atMost(100, SECONDS)
+ .pollInterval(1, SECONDS).until(
+ {
+ def count = sql """ select status, FailedTaskCount
from jobs("type"="insert") where Name like '%${jobName}%' and
ExecuteType='STREAMING' """
+ log.info("FailedTaskCount: " + count)
+ count.size() == 1 && '0' < count.get(1).get(0)
+ }
+ )
+ } catch (Exception ex){
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ }
+ } finally {
+
GetDebugPoint().disableDebugPointForAllBEs("FlushToken.submit_flush_error")
+ }
+
+ // case1: test job fault recovery
+ try {
+ Awaitility.await().atMost(100, SECONDS)
+ .pollInterval(1, SECONDS).until(
+ {
+ def jobSuccendCount = sql """ select SucceedTaskCount from
jobs("type"="insert") where Name like '%${jobName}%' and
ExecuteType='STREAMING' """
+ log.info("jobSuccendCount: " + jobSuccendCount)
+ // check job status and succeed task count larger than 2
+ jobSuccendCount.size() == 1 && '2' <=
jobSuccendCount.get(0).get(0)
+ }
+ )
+ } catch (Exception ex){
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex;
+ }
+
+ // case2: test user pause will not auto resume
+ sql """
+ PAUSE JOB where jobname = '${jobName}'
+ """
+ def count = 0
+ while(count < 10) {
+ def pausedJobStatus = sql """
+ select status from jobs("type"="insert") where Name='${jobName}'
+ """
+ assert pausedJobStatus.get(0).get(0) == "PAUSED"
+ sleep(1000)
+ count++
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]