This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 9ecfd409603 [fix](job) fix StreamingJob loadStatistic reset to zero
after FE checkpoint restart (#61997)
9ecfd409603 is described below
commit 9ecfd4096034a2ae24df33be1d58b2b7e78daac2
Author: wudi <[email protected]>
AuthorDate: Thu Apr 2 17:28:24 2026 +0800
[fix](job) fix StreamingJob loadStatistic reset to zero after FE checkpoint
restart (#61997)
### What problem does this PR solve?
Problem Summary:
1. When FE restarts from a checkpoint image, `replayOnCommitted()` is
not called
for transactions committed before the checkpoint. Since `jobStatistic`
in
`StreamingInsertJob` lacked `@SerializedName`, it was not written into
the
image, causing `scannedRows` and `loadBytes` to reset to zero after
every
checkpoint-based restart.
2. add restart case for cdc tvf
#### Root Cause
`jobStatistic` accumulates statistics via `+=` in `replayOnCommitted()`.
Unlike `offset` (which is an assignment and idempotent), accumulated
values
cannot be recovered from post-checkpoint txn replay alone. The field
must be
persisted in the image to survive checkpoint restarts.
---
.../insert/streaming/StreamingInsertJob.java | 2 +
...eaming_job_cdc_stream_postgres_pause_resume.out | 2 +-
...reaming_job_cdc_stream_postgres_restart_fe.out} | 9 +-
...ob_cdc_stream_postgres_latest_alter_cred.groovy | 4 +-
...ing_job_cdc_stream_postgres_pause_resume.groovy | 27 +-
...aming_job_cdc_stream_postgres_restart_fe.groovy | 288 +++++++++++++++++++++
.../test_streaming_job_restart_fe.groovy | 3 +-
7 files changed, 305 insertions(+), 30 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index cc908a5aa1c..29f9a17e2a9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -108,6 +108,8 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
public static final String JOB_FILE_CATALOG = "streaming_job";
private long dbId;
// Streaming job statistics, all persisted in txn attachment
+ // when checkpoint image replay need Serialized
+ @SerializedName("jstc")
private StreamingJobStatistic jobStatistic = new StreamingJobStatistic();
// Non-txn persisted statistics, used for streaming multi task
@Getter
diff --git
a/regression-test/data/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_pause_resume.out
b/regression-test/data/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_pause_resume.out
index 33451a3763a..a5ddb77e104 100644
---
a/regression-test/data/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_pause_resume.out
+++
b/regression-test/data/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_pause_resume.out
@@ -6,7 +6,7 @@ B1 2
-- !final_data --
A1 1
B1 2
-C1 30
+C1 3
D1 4
E1 5
F1 6
diff --git
a/regression-test/data/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_pause_resume.out
b/regression-test/data/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_restart_fe.out
similarity index 78%
copy from
regression-test/data/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_pause_resume.out
copy to
regression-test/data/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_restart_fe.out
index 33451a3763a..c351ac3cba5 100644
---
a/regression-test/data/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_pause_resume.out
+++
b/regression-test/data/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_restart_fe.out
@@ -1,13 +1,12 @@
-- This file is automatically generated. You should know what you did if you
want to edit this
--- !snapshot_data --
-A1 1
-B1 2
-
-- !final_data --
A1 1
B1 2
-C1 30
+C1 3
D1 4
E1 5
F1 6
+G1 7
+H1 8
+I1 9
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_latest_alter_cred.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_latest_alter_cred.groovy
index 01d3defafaf..f1cd5e13f91 100644
---
a/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_latest_alter_cred.groovy
+++
b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_latest_alter_cred.groovy
@@ -58,10 +58,10 @@
suite("test_streaming_job_cdc_stream_postgres_latest_alter_cred",
sql """
CREATE TABLE IF NOT EXISTS ${currentDb}.${dorisTable} (
- `name` varchar(200) NOT NULL,
+ `name` varchar(200) NULL,
`age` int NULL
) ENGINE=OLAP
- UNIQUE KEY(`name`)
+ DUPLICATE KEY(`name`)
DISTRIBUTED BY HASH(`name`) BUCKETS AUTO
PROPERTIES ("replication_allocation" = "tag.location.default: 1")
"""
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_pause_resume.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_pause_resume.groovy
index d1f9263d7ac..90289f90223 100644
---
a/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_pause_resume.groovy
+++
b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_pause_resume.groovy
@@ -27,7 +27,7 @@ import static java.util.concurrent.TimeUnit.SECONDS
* 1. Snapshot phase: pre-existing rows (A1, B1) are synced via full
snapshot.
* 2. Binlog phase begins: INSERT (C1, D1) are applied; job enters steady
binlog state.
* 3. Job is paused; verify status stays PAUSED and currentOffset /
endOffset are non-empty.
- * 4. While paused: INSERT (E1, F1) and UPDATE C1.age → 30 into the PG
source table.
+ * 4. While paused: INSERT (E1, F1) into the PG source table.
* 5. Job is resumed; verify status transitions to RUNNING.
* 6. Verify all rows inserted before and during pause eventually appear in
Doris (no data loss).
* 7. Verify FailedTaskCount == 0 and no error message after resume.
@@ -45,13 +45,12 @@
suite("test_streaming_job_cdc_stream_postgres_pause_resume", "p0,external,pg,ext
sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
sql """drop table if exists ${currentDb}.${dorisTable} force"""
- // Use UNIQUE key so that UPDATE rows overwrite in Doris
sql """
CREATE TABLE IF NOT EXISTS ${currentDb}.${dorisTable} (
- `name` varchar(200) NOT NULL,
+ `name` varchar(200) NULL,
`age` int NULL
) ENGINE=OLAP
- UNIQUE KEY(`name`)
+ DUPLICATE KEY(`name`)
DISTRIBUTED BY HASH(`name`) BUCKETS AUTO
PROPERTIES ("replication_allocation" = "tag.location.default: 1")
"""
@@ -112,6 +111,8 @@
suite("test_streaming_job_cdc_stream_postgres_pause_resume", "p0,external,pg,ext
connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age)
VALUES ('C1', 3)"""
sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age)
VALUES ('D1', 4)"""
+ def xminResult = sql """SELECT xmin, xmax , * FROM
${pgDB}.${pgSchema}.${pgTable} WHERE name = 'D1'; """
+ log.info("xminResult: " + xminResult)
}
try {
@@ -169,11 +170,10 @@
suite("test_streaming_job_cdc_stream_postgres_pause_resume", "p0,external,pg,ext
// ── Phase 6: DML while job is paused
─────────────────────────────────────────
connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
- // new rows added while paused
sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age)
VALUES ('E1', 5)"""
sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age)
VALUES ('F1', 6)"""
- // update an existing row while paused; after resume Doris
(UNIQUE) should reflect new value
- sql """UPDATE ${pgDB}.${pgSchema}.${pgTable} SET age = 30 WHERE
name = 'C1'"""
+ def xminResult = sql """SELECT xmin, xmax , * FROM
${pgDB}.${pgSchema}.${pgTable} WHERE name = 'F1'; """
+ log.info("xminResult: " + xminResult)
}
// ── Phase 7: resume the job
────────────────────────────────────────────────────
@@ -204,19 +204,6 @@
suite("test_streaming_job_cdc_stream_postgres_pause_resume", "p0,external,pg,ext
throw ex
}
- // wait for UPDATE on C1 to be applied (age=30 in Doris)
- try {
- Awaitility.await().atMost(60, SECONDS).pollInterval(2,
SECONDS).until({
- def rows = sql """SELECT age FROM ${currentDb}.${dorisTable}
WHERE name = 'C1'"""
- log.info("C1 age after resume: " + rows)
- rows.size() == 1 && (rows.get(0).get(0) as int) == 30
- })
- } catch (Exception ex) {
- log.info("job: " + (sql """select * from jobs("type"="insert")
where Name='${jobName}'"""))
- log.info("tasks: " + (sql """select * from tasks("type"="insert")
where JobName='${jobName}'"""))
- throw ex
- }
-
// ── Phase 9: assert final correctness
─────────────────────────────────────────
qt_final_data """ SELECT * FROM ${currentDb}.${dorisTable} ORDER BY
name """
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_restart_fe.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_restart_fe.groovy
new file mode 100644
index 00000000000..2fb63c10690
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_restart_fe.groovy
@@ -0,0 +1,288 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+/**
+ * Test FE restart recovery of a cdc_stream TVF streaming job for PostgreSQL.
+ *
+ * Two restart scenarios are covered in sequence:
+ *
+ * Restart 1 — mid-snapshot:
+ * snapshot_split_size=1 splits 5 pre-existing rows (A1-E1) into 5 separate
tasks.
+ * FE is restarted after the first task succeeds (SucceedTaskCount >= 1) but
before
+ * all splits complete. This exercises
JdbcTvfSourceOffsetProvider.replayIfNeed()
+ * when currentOffset.snapshotSplit() == true: remainingSplits is rebuilt
from the
+ * meta table using chunkHighWatermarkMap recovered via txn replay, so
already-finished
+ * splits are not re-processed.
+ *
+ * Restart 2 — binlog phase:
+ * After the full snapshot completes and F1/G1 are consumed via binlog, FE
is restarted
+ * again. This exercises the binlog recovery path in replayIfNeed() where
+ * currentOffset.snapshotSplit() == false: currentOffset is restored
directly from
+ * txn replay without any remainingSplits rebuild.
+ * H1/I1 are then inserted to verify the job continues reading binlog
correctly.
+ */
+suite("test_streaming_job_cdc_stream_postgres_restart_fe",
+
"docker,p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+ def jobName = "test_streaming_job_cdc_stream_pg_restart_fe"
+ def options = new ClusterOptions()
+ options.setFeNum(1)
+ options.cloudMode = null
+
+ docker(options) {
+ def currentDb = (sql "select database()")[0][0]
+ def dorisTable = "test_streaming_job_cdc_stream_pg_restart_fe_tbl"
+ def pgDB = "postgres"
+ def pgSchema = "cdc_test"
+ def pgUser = "postgres"
+ def pgPassword = "123456"
+ def pgTable = "test_streaming_job_cdc_stream_pg_restart_fe_src"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${dorisTable} force"""
+
+ sql """
+ CREATE TABLE IF NOT EXISTS ${currentDb}.${dorisTable} (
+ `name` varchar(200) NULL,
+ `age` int NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`name`)
+ DISTRIBUTED BY HASH(`name`) BUCKETS AUTO
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1")
+ """
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String pg_port = context.config.otherConfigs.get("pg_14_port")
+ String externalEnvIp =
context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar"
+
+ // ── Phase 1: prepare source table with pre-existing snapshot
rows ───────────
+ // Insert 5 rows so that snapshot_split_size=1 reliably produces
multiple splits.
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${pgTable}"""
+ sql """CREATE TABLE ${pgDB}.${pgSchema}.${pgTable} (
+ "name" varchar(200) PRIMARY KEY,
+ "age" int2
+ )"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age)
VALUES ('A1', 1)"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age)
VALUES ('B1', 2)"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age)
VALUES ('C1', 3)"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age)
VALUES ('D1', 4)"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age)
VALUES ('E1', 5)"""
+ }
+
+ // ── Phase 2: create streaming job (offset=initial, split_size=1
→ 5 tasks) ─
+ sql """
+ CREATE JOB ${jobName}
+ ON STREAMING DO INSERT INTO ${currentDb}.${dorisTable} (name,
age)
+ SELECT name, age FROM cdc_stream(
+ "type" = "postgres",
+ "jdbc_url" =
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "org.postgresql.Driver",
+ "user" = "${pgUser}",
+ "password" = "${pgPassword}",
+ "database" = "${pgDB}",
+ "schema" = "${pgSchema}",
+ "table" = "${pgTable}",
+ "offset" = "initial",
+ "snapshot_split_size" = "1"
+ )
+ """
+
+ // ── Phase 3: wait for the first snapshot task to succeed, then
restart FE ──
+ // snapshot_split_size=1 splits 5 rows (A1-E1) into 5 separate
tasks; we restart
+ // after the first one completes to exercise mid-snapshot FE
recovery.
+ try {
+ Awaitility.await().atMost(300, SECONDS).pollInterval(2,
SECONDS).until({
+ def cnt = sql """select SucceedTaskCount from
jobs("type"="insert")
+ where Name='${jobName}' and
ExecuteType='STREAMING'"""
+ log.info("SucceedTaskCount before restart: " + cnt)
+ cnt.size() == 1 && (cnt.get(0).get(0) as int) >= 1
+ })
+ } catch (Exception ex) {
+ log.info("job: " + (sql """select * from jobs("type"="insert")
where Name='${jobName}'"""))
+ log.info("tasks: " + (sql """select * from
tasks("type"="insert") where JobName='${jobName}'"""))
+ throw ex
+ }
+
+ def jobInfoBeforeRestart = sql """
+ select status, currentOffset, loadStatistic
+ from jobs("type"="insert") where Name='${jobName}'
+ """
+ log.info("job info before first FE restart: " +
jobInfoBeforeRestart)
+ assert jobInfoBeforeRestart.get(0).get(0) == "RUNNING" :
+ "Job should be RUNNING before first restart, got:
${jobInfoBeforeRestart.get(0).get(0)}"
+ assert jobInfoBeforeRestart.get(0).get(1) != null &&
!jobInfoBeforeRestart.get(0).get(1).isEmpty() :
+ "currentOffset should be non-empty before first restart"
+ def scannedRowsBeforeFirstRestart =
parseJson(jobInfoBeforeRestart.get(0).get(2) as String).scannedRows as long
+ log.info("scannedRows before first restart: " +
scannedRowsBeforeFirstRestart)
+
+ // ── Phase 4: restart FE (mid-snapshot)
───────────────────────────────────
+ cluster.restartFrontends()
+ sleep(60000)
+ context.reconnectFe()
+
+ // ── Phase 5: verify job recovers after mid-snapshot restart
──────────────────
+ try {
+ Awaitility.await().atMost(120, SECONDS).pollInterval(2,
SECONDS).until({
+ def status = sql """select status from
jobs("type"="insert") where Name='${jobName}'"""
+ log.info("job status after first restart: " + status)
+ status.size() == 1 && (status.get(0).get(0) == "RUNNING"
|| status.get(0).get(0) == "PENDING")
+ })
+ } catch (Exception ex) {
+ log.info("job: " + (sql """select * from jobs("type"="insert")
where Name='${jobName}'"""))
+ throw ex
+ }
+
+ def jobInfoAfterRestart1 = sql """select currentOffset,
loadStatistic from jobs("type"="insert") where Name='${jobName}'"""
+ log.info("job info after first FE restart: " +
jobInfoAfterRestart1)
+ def scannedRowsAfterFirstRestart =
parseJson(jobInfoAfterRestart1.get(0).get(1) as String).scannedRows as long
+ assert scannedRowsAfterFirstRestart >=
scannedRowsBeforeFirstRestart :
+ "scannedRows should not reset after first FE restart:
before=${scannedRowsBeforeFirstRestart}, after=${scannedRowsAfterFirstRestart}"
+
+ // ── Phase 6: wait for full snapshot to complete, then assert no
re-processing ─
+ // Wait with >= 5 so the await does not time out if a duplicate
row briefly
+ // pushes the count past 5 before we get a chance to observe
exactly 5.
+ try {
+ Awaitility.await().atMost(300, SECONDS).pollInterval(2,
SECONDS).until({
+ def rows = sql """SELECT count(1) FROM
${currentDb}.${dorisTable}
+ WHERE name IN ('A1', 'B1', 'C1', 'D1',
'E1')"""
+ log.info("snapshot rows after restart: " + rows)
+ (rows.get(0).get(0) as int) >= 5
+ })
+ } catch (Exception ex) {
+ log.info("job: " + (sql """select * from jobs("type"="insert")
where Name='${jobName}'"""))
+ log.info("tasks: " + (sql """select * from
tasks("type"="insert") where JobName='${jobName}'"""))
+ throw ex
+ }
+
+ // Verify no snapshot split was re-processed: DUPLICATE KEY table
would retain
+ // duplicate rows, inflating the count above 5.
+ def snapshotCount = sql """SELECT count(1) FROM
${currentDb}.${dorisTable}
+ WHERE name IN ('A1', 'B1', 'C1', 'D1',
'E1')"""
+ assert (snapshotCount.get(0).get(0) as int) == 5 :
+ "Snapshot rows should be exactly 5 after mid-snapshot
restart (no re-processing), got: ${snapshotCount.get(0).get(0)}"
+
+ // ── Phase 7: insert F1, G1 and wait for binlog to consume them
──────────────
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age)
VALUES ('F1', 6)"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age)
VALUES ('G1', 7)"""
+ }
+
+ try {
+ Awaitility.await().atMost(120, SECONDS).pollInterval(2,
SECONDS).until({
+ def rows = sql """SELECT count(1) FROM
${currentDb}.${dorisTable} WHERE name IN ('F1', 'G1')"""
+ log.info("binlog rows before second restart: " + rows)
+ (rows.get(0).get(0) as int) == 2
+ })
+ } catch (Exception ex) {
+ log.info("job: " + (sql """select * from jobs("type"="insert")
where Name='${jobName}'"""))
+ log.info("tasks: " + (sql """select * from
tasks("type"="insert") where JobName='${jobName}'"""))
+ throw ex
+ }
+
+ // ── Phase 8: restart FE in binlog phase
───────────────────────────────────
+ // This exercises the binlog recovery path in replayIfNeed():
+ // currentOffset.snapshotSplit() == false → currentOffset is
reused directly.
+ def jobInfoBeforeSecondRestart = sql """
+ select status, currentOffset, loadStatistic
+ from jobs("type"="insert") where Name='${jobName}'
+ """
+ log.info("job info before second FE restart (binlog phase): " +
jobInfoBeforeSecondRestart)
+ def offsetBeforeSecondRestart =
jobInfoBeforeSecondRestart.get(0).get(1) as String
+ assert offsetBeforeSecondRestart != null &&
offsetBeforeSecondRestart.contains("binlog-split") :
+ "currentOffset should be in binlog state before second
restart (F1/G1 consumed), got: ${offsetBeforeSecondRestart}"
+
+ // Record scannedRows before restart — snapshot(5) + F1 + G1 = at
least 7.
+ def scannedRowsBefore =
parseJson(jobInfoBeforeSecondRestart.get(0).get(2) as String).scannedRows as
long
+ log.info("scannedRows before second restart: " + scannedRowsBefore)
+
+ cluster.restartFrontends()
+ sleep(60000)
+ context.reconnectFe()
+
+ // ── Phase 9: verify job recovers after binlog-phase restart
──────────────────
+ try {
+ Awaitility.await().atMost(120, SECONDS).pollInterval(2,
SECONDS).until({
+ def status = sql """select status from
jobs("type"="insert") where Name='${jobName}'"""
+ log.info("job status after second restart: " + status)
+ status.size() == 1 && (status.get(0).get(0) == "RUNNING"
|| status.get(0).get(0) == "PENDING")
+ })
+ } catch (Exception ex) {
+ log.info("job: " + (sql """select * from jobs("type"="insert")
where Name='${jobName}'"""))
+ throw ex
+ }
+
+ def jobInfoAfterRestart2 = sql """select currentOffset,
loadStatistic from jobs("type"="insert") where Name='${jobName}'"""
+ log.info("job info after second FE restart: " +
jobInfoAfterRestart2)
+
+ // Binlog offset advances with heartbeats even without user data,
so offset equality
+ // is not a stable assertion. Verify correctness via row counts
instead:
+ // if binlog offset regressed after restart, F1/G1 would be
re-inserted and count > 2.
+ def binlogCountAfterRestart = sql """SELECT count(1) FROM
${currentDb}.${dorisTable}
+ WHERE name IN ('F1', 'G1')"""
+ assert (binlogCountAfterRestart.get(0).get(0) as int) == 2 :
+ "Binlog rows F1/G1 should be exactly 2 after binlog-phase
restart (no re-processing), got: ${binlogCountAfterRestart.get(0).get(0)}"
+
+ // ── Phase 10: insert H1, I1 and verify binlog still works after
restart ────
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age)
VALUES ('H1', 8)"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age)
VALUES ('I1', 9)"""
+ }
+
+ try {
+ Awaitility.await().atMost(120, SECONDS).pollInterval(2,
SECONDS).until({
+ def rows = sql """SELECT count(1) FROM
${currentDb}.${dorisTable} WHERE name IN ('H1', 'I1')"""
+ log.info("binlog rows after second restart: " + rows)
+ (rows.get(0).get(0) as int) == 2
+ })
+ } catch (Exception ex) {
+ log.info("job: " + (sql """select * from jobs("type"="insert")
where Name='${jobName}'"""))
+ log.info("tasks: " + (sql """select * from
tasks("type"="insert") where JobName='${jobName}'"""))
+ throw ex
+ }
+
+ qt_final_data """ SELECT * FROM ${currentDb}.${dorisTable} ORDER
BY name """
+
+ // ── Phase 11: verify no failures and scannedRows persisted
across restart ──
+ def jobInfoFinal = sql """
+ select status, FailedTaskCount, ErrorMsg, currentOffset,
loadStatistic
+ from jobs("type"="insert") where Name='${jobName}'
+ """
+ log.info("job info final: " + jobInfoFinal)
+ assert jobInfoFinal.get(0).get(0) == "RUNNING" : "Job should be
RUNNING at end"
+ assert (jobInfoFinal.get(0).get(1) as int) == 0 : "FailedTaskCount
should be 0"
+
+ // Verify scannedRows is not reset to 0 after FE restart
(jobStatistic must persist).
+ def scannedRowsAfter = parseJson(jobInfoFinal.get(0).get(4) as
String).scannedRows as long
+ log.info("scannedRows after second restart: " + scannedRowsAfter)
+ assert scannedRowsAfter >= scannedRowsBefore :
+ "scannedRows should not reset after FE restart:
before=${scannedRowsBefore}, after=${scannedRowsAfter}"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${dorisTable} force"""
+ }
+ }
+}
diff --git
a/regression-test/suites/job_p0/streaming_job/test_streaming_job_restart_fe.groovy
b/regression-test/suites/job_p0/streaming_job/test_streaming_job_restart_fe.groovy
index ae03afb47a4..973535d6928 100644
---
a/regression-test/suites/job_p0/streaming_job/test_streaming_job_restart_fe.groovy
+++
b/regression-test/suites/job_p0/streaming_job/test_streaming_job_restart_fe.groovy
@@ -27,6 +27,7 @@ suite("test_streaming_job_restart_fe", "docker") {
def options = new ClusterOptions()
options.setFeNum(1)
+ options.cloudMode = null
docker(options) {
sql """drop table if exists `${tableName}` force"""
@@ -96,7 +97,6 @@ suite("test_streaming_job_restart_fe", "docker") {
"""
log.info("jobInfo: " + jobInfo)
assert jobInfo.get(0).get(0) ==
"{\"fileName\":\"regression/load/data/example_1.csv\"}";
- assert jobInfo.get(0).get(1) ==
"{\"fileName\":\"regression/load/data/example_1.csv\"}";
def loadStat = parseJson(jobInfo.get(0).get(2))
assert loadStat.scannedRows == 20
assert loadStat.loadBytes == 425
@@ -118,7 +118,6 @@ suite("test_streaming_job_restart_fe", "docker") {
"""
log.info("jobInfo: " + jobInfo)
assert jobInfo.get(0).get(0) ==
"{\"fileName\":\"regression/load/data/example_1.csv\"}";
- assert jobInfo.get(0).get(1) ==
"{\"fileName\":\"regression/load/data/example_1.csv\"}";
def loadStatAfter = parseJson(jobInfo.get(0).get(2))
assert loadStatAfter.scannedRows == 20
assert loadStatAfter.loadBytes == 425
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]