This is an automated email from the ASF dual-hosted git repository.
JNSimba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7fa4d2ed99f [regression-test](streaming-job) add cdc operational cases
for offset modes and pg slot lifecycle (#63514)
7fa4d2ed99f is described below
commit 7fa4d2ed99f67232f2ec11f8f29d8e3d96be2042
Author: wudi <[email protected]>
AuthorDate: Wed May 27 12:48:40 2026 +0800
[regression-test](streaming-job) add cdc operational cases for offset modes
and pg slot lifecycle (#63514)
## Summary
Add 5 regression cases for CDC operational invariants:
- \`test_streaming_mysql_job_offset_earliest\` — earliest replays binlog
without snapshot
- \`test_streaming_postgres_job_publication\` (Tests 5/6/7) — fail-fast
validation on missing slot / publication / required tables
- \`test_streaming_postgres_job_drop_during_snapshot\` — DROP
mid-snapshot cleans up auto slot/pub
- \`test_streaming_postgres_job_special_offset_restart_fe\` — JSON LSN
offset survives FE restart
- \`test_streaming_postgres_job_slot_lsn_advance\` — confirmed_flush_lsn
advances, freezes on PAUSE, resumes after RESUME
\`drop_during_snapshot\` surfaced a real cdc_client concurrency bug:
HTTP \`/api/close\` raced the async-write task thread on
\`JdbcIncrementalSourceReader\` snapshot-reader lists (CME), making
\`PostgresSourceReader.close\` abort before dropping the auto-managed
slot/publication. Fix: thread-safe collections (CopyOnWriteArrayList /
ConcurrentHashMap.newKeySet) + serialize reader state mutations
(\`prepareSnapshotSplits\` / \`prepareStreamSplit\` /
\`finishSplitRecords\` / \`close\`) on the reader monitor;
\`pollRecords\` stays monitor-free so a blocking poll never fences
\`close\` out. Mirrored in \`MySqlSourceReader\`.
## Test plan
- [ ] regression-test runs locally
---
.../source/reader/JdbcIncrementalSourceReader.java | 66 +++---
.../source/reader/mysql/MySqlSourceReader.java | 56 +++---
.../test_streaming_mysql_job_offset_earliest.out | 9 +
...test_streaming_mysql_job_offset_earliest.groovy | 130 ++++++++++++
...eaming_postgres_job_drop_during_snapshot.groovy | 148 ++++++++++++++
.../test_streaming_postgres_job_publication.groovy | 124 +++++++++++-
..._streaming_postgres_job_slot_lsn_advance.groovy | 224 +++++++++++++++++++++
...g_postgres_job_special_offset_restart_fe.groovy | 188 +++++++++++++++++
8 files changed, 881 insertions(+), 64 deletions(-)
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
index 826866e5885..ddbc71c7fd7 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
@@ -63,13 +63,14 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -95,11 +96,11 @@ public abstract class JdbcIncrementalSourceReader extends
AbstractCdcSourceReade
Fetcher<SourceRecords, SourceSplitBase>,
SnapshotSplitState>>
snapshotReaderContexts;
- private Set<String> completedSplitIds = new HashSet<>();
+ private Set<String> completedSplitIds = ConcurrentHashMap.newKeySet();
// Parallel polling support
private ExecutorService pollExecutor;
- private List<CompletableFuture<PollResult>> activePollFutures;
+ private volatile List<CompletableFuture<PollResult>> activePollFutures;
// Stream/binlog reader (single reader for stream split)
private Fetcher<SourceRecords, SourceSplitBase> streamReader;
@@ -109,7 +110,7 @@ public abstract class JdbcIncrementalSourceReader extends
AbstractCdcSourceReade
public JdbcIncrementalSourceReader() {
this.serializer = new DebeziumJsonDeserializer();
- this.snapshotReaderContexts = new ArrayList<>();
+ this.snapshotReaderContexts = new CopyOnWriteArrayList<>();
}
@Override
@@ -285,7 +286,7 @@ public abstract class JdbcIncrementalSourceReader extends
AbstractCdcSourceReade
}
/** Prepare snapshot splits (unified handling for single or multiple
splits) */
- private SplitReadResult prepareSnapshotSplits(
+ private synchronized SplitReadResult prepareSnapshotSplits(
List<org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit>
splits,
JobBaseRecordRequest baseReq)
throws Exception {
@@ -387,7 +388,7 @@ public abstract class JdbcIncrementalSourceReader extends
AbstractCdcSourceReade
}
/** Prepare stream split */
- private SplitReadResult prepareStreamSplit(
+ private synchronized SplitReadResult prepareStreamSplit(
Map<String, Object> offsetMeta, JobBaseRecordRequest baseReq)
throws Exception {
// Load tableSchemas from FE if available (avoids re-discover on
restart)
tryLoadTableSchemasFromRequest(baseReq);
@@ -505,7 +506,7 @@ public abstract class JdbcIncrementalSourceReader extends
AbstractCdcSourceReade
LOG.info(
"Starting parallel polling for {} snapshot readers",
snapshotReaderContexts.size());
- activePollFutures = new ArrayList<>();
+ activePollFutures = new CopyOnWriteArrayList<>();
for (int i = 0; i < snapshotReaderContexts.size(); i++) {
final int index = i;
@@ -555,33 +556,30 @@ public abstract class JdbcIncrementalSourceReader extends
AbstractCdcSourceReade
* data
*/
private PollResult waitForAnyCompletion() throws Exception {
- while (!activePollFutures.isEmpty()) {
- // Wait for any future to complete
+ List<CompletableFuture<PollResult>> snapshot = activePollFutures;
+ while (snapshot != null && !snapshot.isEmpty()) {
CompletableFuture<Object> anyOf =
- CompletableFuture.anyOf(activePollFutures.toArray(new
CompletableFuture[0]));
+ CompletableFuture.anyOf(snapshot.toArray(new
CompletableFuture[0]));
anyOf.join(); // Wait for at least one to complete
// Find and process completed futures
- Iterator<CompletableFuture<PollResult>> iterator =
activePollFutures.iterator();
- while (iterator.hasNext()) {
- CompletableFuture<PollResult> future = iterator.next();
-
+ for (CompletableFuture<PollResult> future : snapshot) {
if (future.isDone()) {
- iterator.remove(); // Remove from active list
+ snapshot.remove(future);
PollResult result = future.get();
if (result != null) {
- // Found a reader with data, return immediately
LOG.info(
"Got result from reader {}, {} futures
remaining",
result.context.getSplit().splitId(),
- activePollFutures.size());
+ snapshot.size());
completedSplitIds.add(result.context.getSplit().splitId());
return result;
}
// If result is null (no data), continue checking other
futures
}
}
+ snapshot = activePollFutures;
}
// All futures completed but none had data
return null;
@@ -614,24 +612,30 @@ public abstract class JdbcIncrementalSourceReader extends
AbstractCdcSourceReade
/** Poll records from stream reader */
private Iterator<SourceRecord> pollRecordsFromStreamReader() throws
InterruptedException {
+ Fetcher<SourceRecords, SourceSplitBase> reader = streamReader;
+ StreamSplit split = streamSplit;
+ StreamSplitState state = streamSplitState;
+ if (reader == null || split == null || state == null) {
+ LOG.info("Stream reader is null at poll start, returning empty");
+ return Collections.emptyIterator();
+ }
- Preconditions.checkState(streamReader != null, "streamReader is null");
- Preconditions.checkNotNull(streamSplitState, "streamSplitState is
null");
-
- Iterator<SourceRecords> dataIt = streamReader.pollSplitRecords();
+ Iterator<SourceRecords> dataIt = reader.pollSplitRecords();
if (dataIt == null || !dataIt.hasNext()) {
+ if (streamReader == null) {
+ LOG.info("Stream reader is null after poll, returning empty");
+ }
return Collections.emptyIterator();
}
SourceRecords sourceRecords = dataIt.next();
- SplitRecords splitRecords =
- new SplitRecords(streamSplit.splitId(),
sourceRecords.iterator());
+ SplitRecords splitRecords = new SplitRecords(split.splitId(),
sourceRecords.iterator());
if (!sourceRecords.getSourceRecordList().isEmpty()) {
LOG.info("{} Records received from stream",
sourceRecords.getSourceRecordList().size());
}
- return new FilteredRecordIterator(splitRecords, streamSplitState);
+ return new FilteredRecordIterator(splitRecords, state);
}
protected abstract DataType fromDbzColumn(Column splitColumn);
@@ -867,7 +871,7 @@ public abstract class JdbcIncrementalSourceReader extends
AbstractCdcSourceReade
}
@Override
- public void finishSplitRecords() {
+ public synchronized void finishSplitRecords() {
// Cancel any active poll operations
if (activePollFutures != null) {
activePollFutures.forEach(f -> f.cancel(true));
@@ -920,19 +924,9 @@ public abstract class JdbcIncrementalSourceReader extends
AbstractCdcSourceReade
JobBaseConfig config);
@Override
- public void close(JobBaseConfig jobConfig) {
+ public synchronized void close(JobBaseConfig jobConfig) {
LOG.info("Close source reader for job {}", jobConfig.getJobId());
-
- // Cancel any active poll operations
- if (activePollFutures != null) {
- activePollFutures.forEach(f -> f.cancel(true));
- activePollFutures.clear();
- activePollFutures = null;
- }
-
- // Clean up all readers
finishSplitRecords();
-
if (tableSchemas != null) {
tableSchemas.clear();
tableSchemas = null;
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
index f576485c2f8..99ac1e0355b 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
@@ -79,7 +79,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -88,6 +87,7 @@ import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -123,11 +123,11 @@ public class MySqlSourceReader extends
AbstractCdcSourceReader {
SnapshotReaderContext<
MySqlSnapshotSplit, SnapshotSplitReader,
MySqlSnapshotSplitState>>
snapshotReaderContexts;
- private Set<String> completedSplitIds = new HashSet<>();
+ private Set<String> completedSplitIds = ConcurrentHashMap.newKeySet();
// Parallel polling support
private ExecutorService pollExecutor;
- private List<CompletableFuture<PollResult>> activePollFutures;
+ private volatile List<CompletableFuture<PollResult>> activePollFutures;
// Binlog reader (single reader for binlog split)
private BinlogSplitReader binlogReader;
@@ -136,7 +136,7 @@ public class MySqlSourceReader extends
AbstractCdcSourceReader {
public MySqlSourceReader() {
this.serializer = new MySqlDebeziumJsonDeserializer();
- this.snapshotReaderContexts = new ArrayList<>();
+ this.snapshotReaderContexts = new CopyOnWriteArrayList<>();
}
@Override
@@ -341,7 +341,7 @@ public class MySqlSourceReader extends
AbstractCdcSourceReader {
}
/** Prepare snapshot splits (unified handling for single or multiple
splits) */
- private SplitReadResult prepareSnapshotSplits(
+ private synchronized SplitReadResult prepareSnapshotSplits(
List<MySqlSnapshotSplit> splits, JobBaseRecordRequest baseReq)
throws Exception {
LOG.info("Preparing {} snapshot split(s) for reading", splits.size());
@@ -429,7 +429,7 @@ public class MySqlSourceReader extends
AbstractCdcSourceReader {
}
/** Prepare binlog split */
- private SplitReadResult prepareBinlogSplit(
+ private synchronized SplitReadResult prepareBinlogSplit(
Map<String, Object> offsetMeta, JobBaseRecordRequest baseReq)
throws Exception {
// Load tableSchemas from FE if available (avoids re-discover on
restart)
tryLoadTableSchemasFromRequest(baseReq);
@@ -527,7 +527,7 @@ public class MySqlSourceReader extends
AbstractCdcSourceReader {
LOG.info(
"Starting parallel polling for {} snapshot readers",
snapshotReaderContexts.size());
- activePollFutures = new ArrayList<>();
+ activePollFutures = new CopyOnWriteArrayList<>();
for (int i = 0; i < snapshotReaderContexts.size(); i++) {
final int index = i;
@@ -574,33 +574,30 @@ public class MySqlSourceReader extends
AbstractCdcSourceReader {
* data
*/
private PollResult waitForAnyCompletion() throws Exception {
- while (!activePollFutures.isEmpty()) {
- // Wait for any future to complete
+ List<CompletableFuture<PollResult>> snapshot = activePollFutures;
+ while (snapshot != null && !snapshot.isEmpty()) {
CompletableFuture<Object> anyOf =
- CompletableFuture.anyOf(activePollFutures.toArray(new
CompletableFuture[0]));
+ CompletableFuture.anyOf(snapshot.toArray(new
CompletableFuture[0]));
anyOf.join(); // Wait for at least one to complete
// Find and process completed futures
- Iterator<CompletableFuture<PollResult>> iterator =
activePollFutures.iterator();
- while (iterator.hasNext()) {
- CompletableFuture<PollResult> future = iterator.next();
-
+ for (CompletableFuture<PollResult> future : snapshot) {
if (future.isDone()) {
- iterator.remove(); // Remove from active list
+ snapshot.remove(future);
PollResult result = future.get();
if (result != null) {
- // Found a reader with data, return immediately
LOG.info(
"Got result from reader {}, {} futures
remaining",
result.context.getSplit().splitId(),
- activePollFutures.size());
+ snapshot.size());
completedSplitIds.add(result.context.getSplit().splitId());
return result;
}
// If result is null (no data), continue checking other
futures
}
}
+ snapshot = activePollFutures;
}
// All futures completed but none had data
return null;
@@ -628,24 +625,30 @@ public class MySqlSourceReader extends
AbstractCdcSourceReader {
/** Poll records from binlog reader */
private Iterator<SourceRecord> pollRecordsFromBinlogReader() throws
InterruptedException {
+ BinlogSplitReader reader = binlogReader;
+ MySqlBinlogSplit split = binlogSplit;
+ MySqlBinlogSplitState state = binlogSplitState;
+ if (reader == null || split == null || state == null) {
+ LOG.info("Binlog reader is null at poll start, returning empty");
+ return Collections.emptyIterator();
+ }
- Preconditions.checkState(binlogReader != null, "binlogReader is null");
- Preconditions.checkNotNull(binlogSplitState, "binlogSplitState is
null");
-
- Iterator<SourceRecords> dataIt = binlogReader.pollSplitRecords();
+ Iterator<SourceRecords> dataIt = reader.pollSplitRecords();
if (dataIt == null || !dataIt.hasNext()) {
+ if (binlogReader == null) {
+ LOG.info("Binlog reader is null after poll, returning empty");
+ }
return Collections.emptyIterator();
}
SourceRecords sourceRecords = dataIt.next();
- SplitRecords splitRecords =
- new SplitRecords(binlogSplit.splitId(),
sourceRecords.iterator());
+ SplitRecords splitRecords = new SplitRecords(split.splitId(),
sourceRecords.iterator());
if (!sourceRecords.getSourceRecordList().isEmpty()) {
LOG.info("{} Records received from binlog",
sourceRecords.getSourceRecordList().size());
}
- return new FilteredRecordIterator(splitRecords, binlogSplitState);
+ return new FilteredRecordIterator(splitRecords, state);
}
/**
@@ -1024,7 +1027,7 @@ public class MySqlSourceReader extends
AbstractCdcSourceReader {
}
@Override
- public void finishSplitRecords() {
+ public synchronized void finishSplitRecords() {
// Cancel any active poll operations
if (activePollFutures != null) {
@@ -1127,9 +1130,8 @@ public class MySqlSourceReader extends
AbstractCdcSourceReader {
}
@Override
- public void close(JobBaseConfig jobConfig) {
+ public synchronized void close(JobBaseConfig jobConfig) {
LOG.info("Close source reader for job {}", jobConfig.getJobId());
-
finishSplitRecords();
if (tableSchemas != null) {
tableSchemas.clear();
diff --git
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_offset_earliest.out
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_offset_earliest.out
new file mode 100644
index 00000000000..54d966bff91
--- /dev/null
+++
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_offset_earliest.out
@@ -0,0 +1,9 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select_after_earliest_replay --
+1 alice
+2 bob
+
+-- !select_after_earliest_incr --
+1 alice_upd
+3 charlie
+
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_offset_earliest.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_offset_earliest.groovy
new file mode 100644
index 00000000000..f114e423633
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_offset_earliest.groovy
@@ -0,0 +1,130 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+// offset=earliest is only valid for MySQL. It skips the snapshot phase
entirely and
+// replays binlog events from the oldest position available on the server. Two
+// invariants this case guards:
+// 1. Pre-existing rows whose INSERT events are still in binlog are picked
up via
+// the binlog path (no JDBC snapshot is run).
+// 2. Subsequent binlog DML (INSERT/UPDATE/DELETE) lands as usual.
+// The table name is randomized so binlog events from prior CI runs of this
case
+// (with the same fixed name) cannot leak in and skew the final state —
debezium
+// include_tables filtering is the only line of defense and it matches by name.
+suite("test_streaming_mysql_job_offset_earliest",
+ "p0,external,mysql,external_docker,external_docker_mysql,nondatalake")
{
+ def jobName = "test_streaming_mysql_job_offset_earliest_name"
+ def currentDb = (sql "select database()")[0][0]
+ def suffix = UUID.randomUUID().toString().replace("-", "").substring(0, 8)
+ def table1 = "earliest_offset_mysql_tbl_${suffix}"
+ def mysqlDb = "test_cdc_db"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${table1} force"""
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String mysql_port = context.config.otherConfigs.get("mysql_57_port");
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar"
+
+ // Two INSERTs land in binlog BEFORE the job exists. earliest must
replay them.
+ connect("root", "123456",
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+ sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}"""
+ sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}"""
+ sql """CREATE TABLE ${mysqlDb}.${table1} (
+ `id` int NOT NULL,
+ `name` varchar(100) DEFAULT NULL,
+ PRIMARY KEY (`id`)
+ ) ENGINE=InnoDB"""
+ sql """INSERT INTO ${mysqlDb}.${table1} VALUES (1, 'alice')"""
+ sql """INSERT INTO ${mysqlDb}.${table1} VALUES (2, 'bob')"""
+ }
+
+ sql """CREATE JOB ${jobName}
+ ON STREAMING
+ FROM MYSQL (
+ "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "com.mysql.cj.jdbc.Driver",
+ "user" = "root",
+ "password" = "123456",
+ "database" = "${mysqlDb}",
+ "include_tables" = "${table1}",
+ "offset" = "earliest"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )
+ """
+
+ try {
+ Awaitility.await().atMost(300, SECONDS).pollInterval(2,
SECONDS).until({
+ def cnt = sql """SELECT count(*) FROM ${currentDb}.${table1}"""
+ cnt.size() == 1 && cnt.get(0).get(0) == 2
+ })
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex
+ }
+
+ qt_select_after_earliest_replay """ SELECT id, name FROM
${currentDb}.${table1} ORDER BY id """
+
+ connect("root", "123456",
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+ sql """INSERT INTO ${mysqlDb}.${table1} VALUES (3, 'charlie')"""
+ sql """UPDATE ${mysqlDb}.${table1} SET name='alice_upd' WHERE
id=1"""
+ sql """DELETE FROM ${mysqlDb}.${table1} WHERE id=2"""
+ }
+
+ try {
+ Awaitility.await().atMost(180, SECONDS).pollInterval(2,
SECONDS).until({
+ def cnt = sql """SELECT count(*) FROM ${currentDb}.${table1}"""
+ def upd = sql """SELECT name FROM ${currentDb}.${table1} WHERE
id=1"""
+ def del = sql """SELECT count(*) FROM ${currentDb}.${table1}
WHERE id=2"""
+ def updName = upd.size() == 0 ? null : upd.get(0).get(0)
+ log.info("incr cnt=${cnt} upd=${updName} del=${del}")
+ cnt.get(0).get(0) == 2 && updName == 'alice_upd' &&
del.get(0).get(0) == 0
+ })
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job (incr): " + showjob)
+ log.info("show task (incr): " + showtask)
+ throw ex
+ }
+
+ qt_select_after_earliest_incr """ SELECT id, name FROM
${currentDb}.${table1} ORDER BY id """
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+ def jobCountRsp = sql """select count(1) from jobs("type"="insert")
where Name ='${jobName}'"""
+ assert jobCountRsp.get(0).get(0) == 0
+
+ connect("root", "123456",
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+ sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}"""
+ }
+ }
+}
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_drop_during_snapshot.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_drop_during_snapshot.groovy
new file mode 100644
index 00000000000..1856a5eb9f3
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_drop_during_snapshot.groovy
@@ -0,0 +1,148 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+// DROP JOB during the snapshot phase (chunks still being dispatched) must
still
+// clean up auto-managed PG resources. test_streaming_postgres_job_publication
only
+// covers DROP JOB after the job has reached a steady state — the in-flight
drop
+// path goes through a different cancel/cleanup branch and historically leaks
the
+// replication slot if cdc_client cancellation races with the FE-side resource
drop.
+suite("test_streaming_postgres_job_drop_during_snapshot",
+ "p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+ def jobName = "test_streaming_pg_drop_during_snapshot_job"
+ def currentDb = (sql "select database()")[0][0]
+ def table1 = "drop_during_snapshot_pg_tbl"
+ def pgDB = "postgres"
+ def pgSchema = "cdc_test"
+ def pgUser = "postgres"
+ def pgPassword = "123456"
+ def totalRows = 300
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${table1} force"""
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String pg_port = context.config.otherConfigs.get("pg_14_port");
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar"
+
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}"""
+ sql """CREATE TABLE ${pgDB}.${pgSchema}.${table1} (
+ "id" varchar(20) PRIMARY KEY,
+ "name" varchar(200)
+ )"""
+ StringBuilder sb = new StringBuilder()
+ sb.append("INSERT INTO ${pgDB}.${pgSchema}.${table1} (id, name)
VALUES ")
+ for (int i = 1; i <= totalRows; i++) {
+ if (i > 1) sb.append(", ")
+ String key = "k_" + String.format("%05d", i)
+ sb.append("('${key}', 'name_${i}')")
+ }
+ sql sb.toString()
+ }
+
+ // Small split_size + single parallelism makes the snapshot slow
enough that
+ // we can reliably catch it mid-flight before DROP.
+ sql """CREATE JOB ${jobName}
+ ON STREAMING
+ FROM POSTGRES (
+ "jdbc_url" =
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "org.postgresql.Driver",
+ "user" = "${pgUser}",
+ "password" = "${pgPassword}",
+ "database" = "${pgDB}",
+ "schema" = "${pgSchema}",
+ "include_tables" = "${table1}",
+ "offset" = "initial",
+ "snapshot_split_size" = "5",
+ "snapshot_parallelism" = "1"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )
+ """
+
+ // Capture jobId for derived slot/publication names (matches the
+ // doris_cdc_${jobId} / doris_pub_${jobId} pattern in
DataSourceConfigKeys).
+ def jobRow = sql """select Id from jobs("type"="insert") where
Name='${jobName}'"""
+ assert jobRow.size() == 1 : "job did not register"
+ def jobId = jobRow.get(0).get(0).toString()
+ def expectedSlot = "doris_cdc_${jobId}"
+ def expectedPub = "doris_pub_${jobId}"
+
+ // Step 1: catch the job mid-snapshot. Confirm slot+publication
actually exist
+ // on PG before issuing DROP — otherwise the cleanup assertion below
is vacuous.
+ try {
+ Awaitility.await().atMost(180, SECONDS).pollInterval(1,
SECONDS).until({
+ def succeed = sql """select SucceedTaskCount from
jobs("type"="insert") where Name = '${jobName}'"""
+ def cnt = sql """SELECT COUNT(*) FROM ${currentDb}.${table1}"""
+ log.info("pre-drop succeed=${succeed} rows=${cnt}")
+ succeed.size() == 1 &&
+ Integer.parseInt(succeed.get(0).get(0).toString()) >=
2 &&
+ cnt.get(0).get(0) < totalRows
+ })
+ } catch (Exception ex) {
+ log.info("job: " + (sql """select * from jobs("type"="insert")
where Name='${jobName}'"""))
+ throw ex
+ }
+ def rowsBeforeDrop = sql("""SELECT COUNT(*) FROM
${currentDb}.${table1}""").get(0).get(0) as int
+ assert rowsBeforeDrop < totalRows : "snapshot finished before we could
DROP — case is moot"
+
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ def slotBefore = sql """SELECT COUNT(1) FROM pg_replication_slots
WHERE slot_name = '${expectedSlot}'"""
+ def pubBefore = sql """SELECT COUNT(1) FROM pg_publication WHERE
pubname = '${expectedPub}'"""
+ log.info("pre-drop pg: slot=${slotBefore} pub=${pubBefore}")
+ assert slotBefore[0][0] == 1 : "auto slot must exist before DROP —
guard against fixture regression"
+ assert pubBefore[0][0] == 1 : "auto publication must exist before
DROP"
+ }
+
+ // Step 2: DROP while snapshot tasks are still being dispatched.
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+ // Step 3: slot and publication must both be cleaned up despite
in-flight task
+ // cancellation. Polling avoids flakiness on slower environments where
the
+ // FE drop-resources phase runs asynchronously with task cancel.
+ Awaitility.await().atMost(120, SECONDS).pollInterval(2,
SECONDS).untilAsserted {
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ def slotAfter = sql """SELECT COUNT(1) FROM
pg_replication_slots WHERE slot_name = '${expectedSlot}'"""
+ def pubAfter = sql """SELECT COUNT(1) FROM pg_publication
WHERE pubname = '${expectedPub}'"""
+ log.info("post-drop pg: slot=${slotAfter} pub=${pubAfter}")
+ assert slotAfter[0][0] == 0 : "slot ${expectedSlot} leaked
after DROP during snapshot"
+ assert pubAfter[0][0] == 0 : "publication ${expectedPub}
leaked after DROP during snapshot"
+ }
+ }
+
+ // Step 4: job row must be gone on FE side.
+ def jobCount = sql """select count(1) from jobs("type"="insert") where
Name = '${jobName}'"""
+ assert jobCount.get(0).get(0) == 0 : "job row not removed after DROP"
+
+ // Cleanup
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}"""
+ }
+ sql """drop table if exists ${currentDb}.${table1} force"""
+ }
+}
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_publication.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_publication.groovy
index 1d29b659a60..d38d07d8c4e 100644
---
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_publication.groovy
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_publication.groovy
@@ -20,11 +20,15 @@ import org.awaitility.Awaitility
import static java.util.concurrent.TimeUnit.SECONDS
-// Verify per-resource ownership of PG replication slot and publication:
+// Verify per-resource ownership of PG replication slot and publication, plus
+// the fail-fast validation paths in PostgresResourceValidator:
// Test 1: auto-generated slot & publication — created on job start, cleaned
up on drop
// Test 2: user-provided slot & publication — Doris uses but never drops them
// Test 3: mixed (user publication + auto slot) — only auto slot is dropped
on job deletion
// Test 4: slot_name / publication_name are immutable via ALTER JOB
+// Test 5: user-provided slot_name that does not exist → actionable error
+// Test 6: user-provided publication_name that does not exist → actionable
error
+// Test 7: user-provided publication exists but is missing required tables →
actionable error
suite("test_streaming_postgres_job_publication",
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
def jobName = "test_pg_pub_job"
def currentDb = (sql "select database()")[0][0]
@@ -401,6 +405,124 @@ suite("test_streaming_postgres_job_publication",
"p0,external,pg,external_docker
sql """DROP JOB IF EXISTS where jobname = '${alterJob}'"""
+ // PostgresResourceValidator fail-fast paths: every CREATE JOB goes
through
+ // the validator, and operators most often hit it via three
misconfigurations
+ // below. Each error must be actionable (mention the missing resource
by name
+ // and hint at the fix) so support can read the SQL error and move on.
+ def validatorJob = "test_pg_pub_validator_job"
+ sql """DROP JOB IF EXISTS where jobname = '${validatorJob}'"""
+ // Recreate the PG tables — Test 4 left them present but the alterJob
is gone.
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}"""
+ sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table2}"""
+ sql """CREATE TABLE ${pgDB}.${pgSchema}.${table1} (
+ "id" int PRIMARY KEY,
+ "name" varchar(200)
+ )"""
+ sql """CREATE TABLE ${pgDB}.${pgSchema}.${table2} (
+ "id" int PRIMARY KEY,
+ "name" varchar(200)
+ )"""
+ }
+
+ // ========== Test 5: user-provided slot_name does not exist ==========
+ // Pre-create a valid publication so the only fail-fast path is the
slot.
+ def goodPub5 = "test_pg_pub_validator_pub_5"
+ def missingSlot5 = "test_pg_pub_validator_missing_slot_5"
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """DROP PUBLICATION IF EXISTS ${goodPub5}"""
+ sql """CREATE PUBLICATION ${goodPub5} FOR TABLE
${pgDB}.${pgSchema}.${table1}"""
+ def existing = sql """SELECT COUNT(1) FROM pg_replication_slots
WHERE slot_name = '${missingSlot5}'"""
+ if (existing[0][0] != 0) {
+ sql """SELECT pg_drop_replication_slot('${missingSlot5}')"""
+ }
+ }
+ test {
+ sql """CREATE JOB ${validatorJob}
+ ON STREAMING
+ FROM POSTGRES (
+ "jdbc_url" =
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "org.postgresql.Driver",
+ "user" = "${pgUser}",
+ "password" = "${pgPassword}",
+ "database" = "${pgDB}",
+ "schema" = "${pgSchema}",
+ "include_tables" = "${table1}",
+ "slot_name" = "${missingSlot5}",
+ "publication_name" = "${goodPub5}",
+ "offset" = "initial"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )
+ """
+ exception "replication slot does not exist"
+ }
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """DROP PUBLICATION IF EXISTS ${goodPub5}"""
+ }
+
+ // ========== Test 6: user-provided publication_name does not exist
==========
+ def missingPub6 = "test_pg_pub_validator_missing_pub_6"
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """DROP PUBLICATION IF EXISTS ${missingPub6}"""
+ }
+ test {
+ sql """CREATE JOB ${validatorJob}
+ ON STREAMING
+ FROM POSTGRES (
+ "jdbc_url" =
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "org.postgresql.Driver",
+ "user" = "${pgUser}",
+ "password" = "${pgPassword}",
+ "database" = "${pgDB}",
+ "schema" = "${pgSchema}",
+ "include_tables" = "${table1}",
+ "publication_name" = "${missingPub6}",
+ "offset" = "initial"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )
+ """
+ exception "publication does not exist"
+ }
+
+ // ========== Test 7: user pub exists but does not include the
requested tables ==========
+ // Publication covers ${table2} only, but the job asks for ${table1} →
validator
+ // must reject with the actionable "ALTER PUBLICATION ... ADD TABLE"
hint.
+ def partialPub7 = "test_pg_pub_validator_partial_pub_7"
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """DROP PUBLICATION IF EXISTS ${partialPub7}"""
+ sql """CREATE PUBLICATION ${partialPub7} FOR TABLE
${pgDB}.${pgSchema}.${table2}"""
+ }
+ test {
+ sql """CREATE JOB ${validatorJob}
+ ON STREAMING
+ FROM POSTGRES (
+ "jdbc_url" =
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "org.postgresql.Driver",
+ "user" = "${pgUser}",
+ "password" = "${pgPassword}",
+ "database" = "${pgDB}",
+ "schema" = "${pgSchema}",
+ "include_tables" = "${table1}",
+ "publication_name" = "${partialPub7}",
+ "offset" = "initial"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )
+ """
+ exception "missing required tables"
+ }
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """DROP PUBLICATION IF EXISTS ${partialPub7}"""
+ }
+
// Cleanup PG tables
connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}"""
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_slot_lsn_advance.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_slot_lsn_advance.groovy
new file mode 100644
index 00000000000..0ddf7cf94b2
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_slot_lsn_advance.groovy
@@ -0,0 +1,224 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+// pg_replication_slots.confirmed_flush_lsn must advance as cdc_client consumes
+// WAL, must NOT advance while the job is paused (after the in-flight task
+// finishes its final ack), and must resume advancing after RESUME. A stuck
+// confirmed_flush_lsn means WAL piles up on the source and eventually exhausts
+// disk — operationally this is the single most important health signal for a
+// long-running PG CDC job. See [[project_pgcdc_task_offset_stuck]] for the
+// customer-site bug class this guards against.
+//
+// PAUSE semantics (verified in code):
+// PipelineCoordinator.writeRecords runs one maxInterval window per task,
+// then commitSourceOffset (acks LSN to PG) and finishSplitRecords (closes
+// the replication connection). PAUSE only stops the FE from scheduling the
+// next task — the in-flight task runs to its natural end and acks LSN one
+// last time. After that final ack, no consumer exists and no further LSN
+// advancement is possible regardless of WAL growth on the source.
+// Hence the test waits for LSN to settle after PAUSE before asserting it stays
+// frozen. max_interval=3 keeps the in-flight task short so the settle window
+// stays under 30s.
+//
+// Uses a user-provided slot/publication for two reasons: (1) the slot name is
+// known up front so we don't have to fish jobId out of the jobs() view, and
+// (2) DROP JOB then leaves the slot intact, so post-test cleanup is explicit.
+suite("test_streaming_postgres_job_slot_lsn_advance",
+ "p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+ def jobName = "test_streaming_pg_slot_lsn_advance_job"
+ def currentDb = (sql "select database()")[0][0]
+ def table1 = "slot_lsn_advance_pg_tbl"
+ def pgDB = "postgres"
+ def pgSchema = "cdc_test"
+ def pgUser = "postgres"
+ def pgPassword = "123456"
+ def userSlot = "slot_lsn_advance_user_slot"
+ def userPub = "slot_lsn_advance_user_pub"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${table1} force"""
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String pg_port = context.config.otherConfigs.get("pg_14_port");
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar"
+
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}"""
+ sql """CREATE TABLE ${pgDB}.${pgSchema}.${table1} (
+ "id" int PRIMARY KEY,
+ "name" varchar(200)
+ )"""
+ sql """DROP PUBLICATION IF EXISTS ${userPub}"""
+ sql """CREATE PUBLICATION ${userPub} FOR TABLE
${pgDB}.${pgSchema}.${table1}"""
+ def existing = sql """SELECT COUNT(1) FROM pg_replication_slots
WHERE slot_name = '${userSlot}'"""
+ if (existing[0][0] != 0) {
+ sql """SELECT pg_drop_replication_slot('${userSlot}')"""
+ }
+ sql """SELECT pg_create_logical_replication_slot('${userSlot}',
'pgoutput')"""
+ }
+
+ // offset=latest skips snapshot and goes straight to streaming — that
is
+ // where confirmed_flush_lsn is meaningful. max_interval=3 keeps each
+ // task short so PAUSE settles fast.
+ sql """CREATE JOB ${jobName}
+ PROPERTIES ("max_interval" = "3")
+ ON STREAMING
+ FROM POSTGRES (
+ "jdbc_url" =
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "org.postgresql.Driver",
+ "user" = "${pgUser}",
+ "password" = "${pgPassword}",
+ "database" = "${pgDB}",
+ "schema" = "${pgSchema}",
+ "include_tables" = "${table1}",
+ "slot_name" = "${userSlot}",
+ "publication_name" = "${userPub}",
+ "offset" = "latest"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )
+ """
+
+ Awaitility.await().atMost(120, SECONDS).pollInterval(1,
SECONDS).until({
+ def st = sql """select status from jobs("type"="insert") where
Name='${jobName}'"""
+ st.size() == 1 && st.get(0).get(0) == "RUNNING"
+ })
+
+ // Helper to read confirmed_flush_lsn as BigInteger (Long would
overflow
+ // on production-scale wraparound; BigInteger is safe).
+ Closure<BigInteger> readLsn = {
+ BigInteger out = BigInteger.valueOf(-1L)
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ def r = sql """SELECT confirmed_flush_lsn::text FROM
pg_replication_slots WHERE slot_name = '${userSlot}'"""
+ if (r.size() == 1 && r.get(0).get(0) != null) {
+ def parts = r.get(0).get(0).toString().split("/")
+ out = new BigInteger(parts[0], 16).shiftLeft(32).add(new
BigInteger(parts[1], 16))
+ }
+ }
+ out
+ }
+
+ def lsn0 = readLsn()
+ log.info("initial confirmed_flush_lsn=${lsn0}")
+ assert lsn0.signum() > 0 : "user slot ${userSlot} confirmed_flush_lsn
is null/invalid"
+
+ // ===== Phase 1: steady-state advancement =====
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ for (int i = 1; i <= 20; i++) {
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (${i},
'name_${i}')"""
+ }
+ }
+ Awaitility.await().atMost(180, SECONDS).pollInterval(2,
SECONDS).until({
+ def cnt = sql """SELECT count(*) FROM ${currentDb}.${table1}"""
+ cnt.size() == 1 && cnt.get(0).get(0) >= 20
+ })
+ Awaitility.await().atMost(120, SECONDS).pollInterval(2,
SECONDS).until({
+ def cur = readLsn()
+ log.info("phase1 poll lsn=${cur} (init=${lsn0})")
+ cur > lsn0
+ })
+
+ // ===== Phase 2: PAUSE — in-flight task acks once then LSN must
freeze =====
+ sql """PAUSE JOB where jobname = '${jobName}'"""
+ Awaitility.await().atMost(30, SECONDS).pollInterval(1, SECONDS).until({
+ def st = sql """select status from jobs("type"="insert") where
Name='${jobName}'"""
+ st.size() == 1 && st.get(0).get(0) == "PAUSED"
+ })
+
+ // Wait for LSN to settle: N consecutive equal samples confirm the
in-flight
+ // task has completed its final commitSourceOffset and the connection
is closed.
+ BigInteger lastLsn = BigInteger.valueOf(-1L)
+ int stable = 0
+ final int requiredStable = 4
+ Awaitility.await().atMost(120, SECONDS).pollInterval(4,
SECONDS).until({
+ BigInteger cur = readLsn()
+ if (cur == lastLsn) {
+ stable++
+ } else {
+ stable = 1
+ lastLsn = cur
+ }
+ log.info("pause-settle lsn=${cur}
stable=${stable}/${requiredStable}")
+ stable >= requiredStable
+ })
+ def lsnAtPauseSettled = lastLsn
+
+ // Generate WAL while paused. With no consumer the slot's
+ // confirmed_flush_lsn must remain frozen even though WAL is growing.
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ for (int i = 100; i < 120; i++) {
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (${i},
'paused_${i}')"""
+ }
+ }
+ sleep(15000)
+ def lsnDuringPause = readLsn()
+ log.info("paused: settled=${lsnAtPauseSettled}
duringPause=${lsnDuringPause}")
+ assert lsnDuringPause == lsnAtPauseSettled :
+ "confirmed_flush_lsn advanced while paused with no consumer: "
+
+ "${lsnAtPauseSettled} -> ${lsnDuringPause}"
+
+ // ===== Phase 3: RESUME — LSN must advance past the paused snapshot
=====
+ sql """RESUME JOB where jobname = '${jobName}'"""
+ Awaitility.await().atMost(60, SECONDS).pollInterval(1, SECONDS).until({
+ def st = sql """select status from jobs("type"="insert") where
Name='${jobName}'"""
+ st.size() == 1 && st.get(0).get(0) == "RUNNING"
+ })
+ Awaitility.await().atMost(180, SECONDS).pollInterval(2,
SECONDS).until({
+ def cnt = sql """SELECT count(*) FROM ${currentDb}.${table1}"""
+ cnt.size() == 1 && cnt.get(0).get(0) >= 40
+ })
+ Awaitility.await().atMost(180, SECONDS).pollInterval(2,
SECONDS).until({
+ def cur = readLsn()
+ log.info("phase3 poll lsn=${cur} (pause=${lsnAtPauseSettled})")
+ cur > lsnAtPauseSettled
+ })
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+ // User-provided slot/pub survive DROP JOB; clean up manually. After
DROP JOB
+ // the cdc_client may still be winding down its replication connection
— PG
+ // rejects pg_drop_replication_slot on an active slot, so poll
active=false
+ // before issuing the drop.
+ Awaitility.await().atMost(60, SECONDS).pollInterval(1, SECONDS).until({
+ boolean inactive = false
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ def r = sql """SELECT active FROM pg_replication_slots WHERE
slot_name = '${userSlot}'"""
+ inactive = r.size() == 1 && r.get(0).get(0) == false
+ }
+ inactive
+ })
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ def slotStillThere = sql """SELECT COUNT(1) FROM
pg_replication_slots WHERE slot_name = '${userSlot}'"""
+ assert slotStillThere[0][0] == 1 : "user-provided slot must not be
dropped by Doris"
+ sql """SELECT pg_drop_replication_slot('${userSlot}')"""
+ sql """DROP PUBLICATION IF EXISTS ${userPub}"""
+ sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}"""
+ }
+ sql """drop table if exists ${currentDb}.${table1} force"""
+ }
+}
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_special_offset_restart_fe.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_special_offset_restart_fe.groovy
new file mode 100644
index 00000000000..802e4503890
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_special_offset_restart_fe.groovy
@@ -0,0 +1,188 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+// Mirror of test_streaming_mysql_job_special_offset_restart_fe for the PG
path:
+// CREATE JOB with a JSON LSN offset, sync, restart FE, verify currentOffset
+// survives the replay and subsequent binlog DML still lands.
+//
+// PG-specific wrinkle: an auto-managed slot starts retaining WAL only at slot
+// creation time, so a CREATE-with-past-LSN against an auto slot would fail
+// because PG has already purged the requested LSN. We therefore pre-create a
+// user-provided slot first — that pins the WAL retention horizon back in time
+// far enough to make the LSN we capture valid.
+suite("test_streaming_postgres_job_special_offset_restart_fe",
+ "docker,pg,external_docker,external_docker_pg,nondatalake") {
+ def jobName = "test_streaming_pg_special_offset_restart_fe"
+ def options = new ClusterOptions()
+ options.setFeNum(1)
+ // TODO: remove once cleanMeta targets the BE that owns the reader (binlog
+ // phase pinned to a single BE). Today selectBackend() round-robins across
+ // BEs, so /api/close can land on a BE that doesn't hold the PG replication
+ // connection — the real holder is never notified and the slot stays
active.
+ options.setBeNum(1)
+ options.cloudMode = null
+
+ docker(options) {
+ def currentDb = (sql "select database()")[0][0]
+ def table1 = "special_offset_restart_pg_tbl"
+ def pgDB = "postgres"
+ def pgSchema = "cdc_test"
+ def pgUser = "postgres"
+ def pgPassword = "123456"
+ def userSlot = "special_offset_restart_slot"
+ def userPub = "special_offset_restart_pub"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${table1} force"""
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String pg_port = context.config.otherConfigs.get("pg_14_port");
+ String externalEnvIp =
context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar"
+
+ // Setup: fresh PG table + fresh user slot/pub. Slot must be
created
+ // BEFORE the LSN we capture below, otherwise PG would have purged
+ // the WAL covering that LSN by the time the job tries to replay
it.
+ def lsnAtCreate = ""
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}"""
+ sql """CREATE TABLE ${pgDB}.${pgSchema}.${table1} (
+ "id" int PRIMARY KEY,
+ "name" varchar(100)
+ )"""
+ sql """DROP PUBLICATION IF EXISTS ${userPub}"""
+ sql """CREATE PUBLICATION ${userPub} FOR TABLE
${pgDB}.${pgSchema}.${table1}"""
+ def existing = sql """SELECT COUNT(1) FROM
pg_replication_slots WHERE slot_name = '${userSlot}'"""
+ if (existing[0][0] != 0) {
+ sql """SELECT pg_drop_replication_slot('${userSlot}')"""
+ }
+ sql """SELECT
pg_create_logical_replication_slot('${userSlot}', 'pgoutput')"""
+
+ // Capture LSN AFTER slot creation, BEFORE the INSERTs the job
will read.
+ def lsnRows = sql """SELECT pg_current_wal_lsn()::text"""
+ def lsnStr = lsnRows[0][0].toString()
+ def parts = lsnStr.split("/")
+ lsnAtCreate = new BigInteger(parts[0], 16).shiftLeft(32)
+ .add(new BigInteger(parts[1], 16)).toString()
+ log.info("CREATE LSN mark: ${lsnStr} -> numeric:
${lsnAtCreate}")
+
+ // Inserts after the mark: these are what the job should
stream.
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (1,
'alice')"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (2,
'bob')"""
+ }
+
+ def offsetJson = """{"lsn":"${lsnAtCreate}"}"""
+ log.info("Creating job with LSN offset: ${offsetJson}")
+ sql """CREATE JOB ${jobName}
+ ON STREAMING
+ FROM POSTGRES (
+ "jdbc_url" =
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "org.postgresql.Driver",
+ "user" = "${pgUser}",
+ "password" = "${pgPassword}",
+ "database" = "${pgDB}",
+ "schema" = "${pgSchema}",
+ "include_tables" = "${table1}",
+ "slot_name" = "${userSlot}",
+ "publication_name" = "${userPub}",
+ "offset" = '${offsetJson}'
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )
+ """
+
+ try {
+ Awaitility.await().atMost(300, SECONDS).pollInterval(2,
SECONDS).until({
+ def succeed = sql """select SucceedTaskCount from
jobs("type"="insert") where Name='${jobName}' and ExecuteType='STREAMING'"""
+ def cnt = sql """SELECT count(*) FROM
${currentDb}.${table1}"""
+ log.info("pre-restart succeed=${succeed} rows=${cnt}")
+ succeed.size() == 1 &&
+ (succeed.get(0).get(0) as int) >= 1 &&
+ cnt.size() == 1 && cnt.get(0).get(0) == 2
+ })
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert")
where JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex;
+ }
+
+ def jobInfoBefore = sql """
+ select loadStatistic, status, currentOffset from
jobs("type"="insert") where Name='${jobName}'
+ """
+ log.info("jobInfoBefore: " + jobInfoBefore)
+ assert jobInfoBefore.get(0).get(1) == "RUNNING"
+
+ // Restart FE — currentOffset must replay cleanly from BDBJE
editlog + txn attachments.
+ cluster.restartFrontends()
+ sleep(60000)
+ context.reconnectFe()
+
+ def jobInfoAfter = sql """
+ select loadStatistic, status, currentOffset from
jobs("type"="insert") where Name='${jobName}'
+ """
+ log.info("jobInfoAfter: " + jobInfoAfter)
+ assert jobInfoAfter.get(0).get(1) == "RUNNING"
+ assert jobInfoAfter.get(0).get(2) == jobInfoBefore.get(0).get(2) :
+ "currentOffset diverged after restart:
before=${jobInfoBefore.get(0).get(2)} after=${jobInfoAfter.get(0).get(2)}"
+
+ // Post-restart binlog still lands.
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (3,
'charlie')"""
+ }
+ Awaitility.await().atMost(300, SECONDS).pollInterval(2,
SECONDS).until({
+ def result = sql """SELECT count(*) FROM
${currentDb}.${table1}"""
+ result[0][0] >= 3
+ })
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${table1} force"""
+
+ // User-provided slot/pub must survive DROP JOB; clean up manually.
+ // After DROP JOB the cdc_client may still be winding down its
replication
+ // connection — PG rejects pg_drop_replication_slot on an active
slot, so
+ // poll active=false before issuing the drop.
+ Awaitility.await().atMost(60, SECONDS).pollInterval(1,
SECONDS).until({
+ boolean inactive = false
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ def r = sql """SELECT active FROM pg_replication_slots
WHERE slot_name = '${userSlot}'"""
+ inactive = r.size() == 1 && r.get(0).get(0) == false
+ }
+ inactive
+ })
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ def slotStillThere = sql """SELECT COUNT(1) FROM
pg_replication_slots WHERE slot_name = '${userSlot}'"""
+ assert slotStillThere[0][0] == 1 :
+ "user-provided slot ${userSlot} must not be dropped by
Doris"
+ sql """SELECT pg_drop_replication_slot('${userSlot}')"""
+ sql """DROP PUBLICATION IF EXISTS ${userPub}"""
+ sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}"""
+ }
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]