This is an automated email from the ASF dual-hosted git repository.

JNSimba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7fa4d2ed99f [regression-test](streaming-job) add cdc operational cases 
for offset modes and pg slot lifecycle (#63514)
7fa4d2ed99f is described below

commit 7fa4d2ed99f67232f2ec11f8f29d8e3d96be2042
Author: wudi <[email protected]>
AuthorDate: Wed May 27 12:48:40 2026 +0800

    [regression-test](streaming-job) add cdc operational cases for offset modes 
and pg slot lifecycle (#63514)
    
    ## Summary
    
    Add 5 regression cases for CDC operational invariants:
    
    - \`test_streaming_mysql_job_offset_earliest\` — earliest replays binlog
    without snapshot
    - \`test_streaming_postgres_job_publication\` (Tests 5/6/7) — fail-fast
    validation on missing slot / publication / required tables
    - \`test_streaming_postgres_job_drop_during_snapshot\` — DROP
    mid-snapshot cleans up auto slot/pub
    - \`test_streaming_postgres_job_special_offset_restart_fe\` — JSON LSN
    offset survives FE restart
    - \`test_streaming_postgres_job_slot_lsn_advance\` — confirmed_flush_lsn
    advances, freezes on PAUSE, resumes after RESUME
    
    \`drop_during_snapshot\` surfaced a real cdc_client concurrency bug:
    HTTP \`/api/close\` raced the async-write task thread on
    \`JdbcIncrementalSourceReader\` snapshot-reader lists (CME), making
    \`PostgresSourceReader.close\` abort before dropping the auto-managed
    slot/publication. Fix: thread-safe collections (CopyOnWriteArrayList /
    ConcurrentHashMap.newKeySet) + serialize reader state mutations
    (\`prepareSnapshotSplits\` / \`prepareStreamSplit\` /
    \`finishSplitRecords\` / \`close\`) on the reader monitor;
    \`pollRecords\` stays monitor-free so a blocking poll never fences
    \`close\` out. Mirrored in \`MySqlSourceReader\`.
    
    ## Test plan
    - [ ] regression-test runs locally
---
 .../source/reader/JdbcIncrementalSourceReader.java |  66 +++---
 .../source/reader/mysql/MySqlSourceReader.java     |  56 +++---
 .../test_streaming_mysql_job_offset_earliest.out   |   9 +
 ...test_streaming_mysql_job_offset_earliest.groovy | 130 ++++++++++++
 ...eaming_postgres_job_drop_during_snapshot.groovy | 148 ++++++++++++++
 .../test_streaming_postgres_job_publication.groovy | 124 +++++++++++-
 ..._streaming_postgres_job_slot_lsn_advance.groovy | 224 +++++++++++++++++++++
 ...g_postgres_job_special_offset_restart_fe.groovy | 188 +++++++++++++++++
 8 files changed, 881 insertions(+), 64 deletions(-)

diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
index 826866e5885..ddbc71c7fd7 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
@@ -63,13 +63,14 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
@@ -95,11 +96,11 @@ public abstract class JdbcIncrementalSourceReader extends 
AbstractCdcSourceReade
                             Fetcher<SourceRecords, SourceSplitBase>,
                             SnapshotSplitState>>
             snapshotReaderContexts;
-    private Set<String> completedSplitIds = new HashSet<>();
+    private Set<String> completedSplitIds = ConcurrentHashMap.newKeySet();
 
     // Parallel polling support
     private ExecutorService pollExecutor;
-    private List<CompletableFuture<PollResult>> activePollFutures;
+    private volatile List<CompletableFuture<PollResult>> activePollFutures;
 
     // Stream/binlog reader (single reader for stream split)
     private Fetcher<SourceRecords, SourceSplitBase> streamReader;
@@ -109,7 +110,7 @@ public abstract class JdbcIncrementalSourceReader extends 
AbstractCdcSourceReade
 
     public JdbcIncrementalSourceReader() {
         this.serializer = new DebeziumJsonDeserializer();
-        this.snapshotReaderContexts = new ArrayList<>();
+        this.snapshotReaderContexts = new CopyOnWriteArrayList<>();
     }
 
     @Override
@@ -285,7 +286,7 @@ public abstract class JdbcIncrementalSourceReader extends 
AbstractCdcSourceReade
     }
 
     /** Prepare snapshot splits (unified handling for single or multiple 
splits) */
-    private SplitReadResult prepareSnapshotSplits(
+    private synchronized SplitReadResult prepareSnapshotSplits(
             
List<org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit> 
splits,
             JobBaseRecordRequest baseReq)
             throws Exception {
@@ -387,7 +388,7 @@ public abstract class JdbcIncrementalSourceReader extends 
AbstractCdcSourceReade
     }
 
     /** Prepare stream split */
-    private SplitReadResult prepareStreamSplit(
+    private synchronized SplitReadResult prepareStreamSplit(
             Map<String, Object> offsetMeta, JobBaseRecordRequest baseReq) 
throws Exception {
         // Load tableSchemas from FE if available (avoids re-discover on 
restart)
         tryLoadTableSchemasFromRequest(baseReq);
@@ -505,7 +506,7 @@ public abstract class JdbcIncrementalSourceReader extends 
AbstractCdcSourceReade
         LOG.info(
                 "Starting parallel polling for {} snapshot readers", 
snapshotReaderContexts.size());
 
-        activePollFutures = new ArrayList<>();
+        activePollFutures = new CopyOnWriteArrayList<>();
 
         for (int i = 0; i < snapshotReaderContexts.size(); i++) {
             final int index = i;
@@ -555,33 +556,30 @@ public abstract class JdbcIncrementalSourceReader extends 
AbstractCdcSourceReade
      *     data
      */
     private PollResult waitForAnyCompletion() throws Exception {
-        while (!activePollFutures.isEmpty()) {
-            // Wait for any future to complete
+        List<CompletableFuture<PollResult>> snapshot = activePollFutures;
+        while (snapshot != null && !snapshot.isEmpty()) {
             CompletableFuture<Object> anyOf =
-                    CompletableFuture.anyOf(activePollFutures.toArray(new 
CompletableFuture[0]));
+                    CompletableFuture.anyOf(snapshot.toArray(new 
CompletableFuture[0]));
 
             anyOf.join(); // Wait for at least one to complete
 
             // Find and process completed futures
-            Iterator<CompletableFuture<PollResult>> iterator = 
activePollFutures.iterator();
-            while (iterator.hasNext()) {
-                CompletableFuture<PollResult> future = iterator.next();
-
+            for (CompletableFuture<PollResult> future : snapshot) {
                 if (future.isDone()) {
-                    iterator.remove(); // Remove from active list
+                    snapshot.remove(future);
                     PollResult result = future.get();
                     if (result != null) {
-                        // Found a reader with data, return immediately
                         LOG.info(
                                 "Got result from reader {}, {} futures 
remaining",
                                 result.context.getSplit().splitId(),
-                                activePollFutures.size());
+                                snapshot.size());
                         
completedSplitIds.add(result.context.getSplit().splitId());
                         return result;
                     }
                     // If result is null (no data), continue checking other 
futures
                 }
             }
+            snapshot = activePollFutures;
         }
         // All futures completed but none had data
         return null;
@@ -614,24 +612,30 @@ public abstract class JdbcIncrementalSourceReader extends 
AbstractCdcSourceReade
 
     /** Poll records from stream reader */
     private Iterator<SourceRecord> pollRecordsFromStreamReader() throws 
InterruptedException {
+        Fetcher<SourceRecords, SourceSplitBase> reader = streamReader;
+        StreamSplit split = streamSplit;
+        StreamSplitState state = streamSplitState;
+        if (reader == null || split == null || state == null) {
+            LOG.info("Stream reader is null at poll start, returning empty");
+            return Collections.emptyIterator();
+        }
 
-        Preconditions.checkState(streamReader != null, "streamReader is null");
-        Preconditions.checkNotNull(streamSplitState, "streamSplitState is 
null");
-
-        Iterator<SourceRecords> dataIt = streamReader.pollSplitRecords();
+        Iterator<SourceRecords> dataIt = reader.pollSplitRecords();
         if (dataIt == null || !dataIt.hasNext()) {
+            if (streamReader == null) {
+                LOG.info("Stream reader is null after poll, returning empty");
+            }
             return Collections.emptyIterator();
         }
 
         SourceRecords sourceRecords = dataIt.next();
-        SplitRecords splitRecords =
-                new SplitRecords(streamSplit.splitId(), 
sourceRecords.iterator());
+        SplitRecords splitRecords = new SplitRecords(split.splitId(), 
sourceRecords.iterator());
 
         if (!sourceRecords.getSourceRecordList().isEmpty()) {
             LOG.info("{} Records received from stream", 
sourceRecords.getSourceRecordList().size());
         }
 
-        return new FilteredRecordIterator(splitRecords, streamSplitState);
+        return new FilteredRecordIterator(splitRecords, state);
     }
 
     protected abstract DataType fromDbzColumn(Column splitColumn);
@@ -867,7 +871,7 @@ public abstract class JdbcIncrementalSourceReader extends 
AbstractCdcSourceReade
     }
 
     @Override
-    public void finishSplitRecords() {
+    public synchronized void finishSplitRecords() {
         // Cancel any active poll operations
         if (activePollFutures != null) {
             activePollFutures.forEach(f -> f.cancel(true));
@@ -920,19 +924,9 @@ public abstract class JdbcIncrementalSourceReader extends 
AbstractCdcSourceReade
             JobBaseConfig config);
 
     @Override
-    public void close(JobBaseConfig jobConfig) {
+    public synchronized void close(JobBaseConfig jobConfig) {
         LOG.info("Close source reader for job {}", jobConfig.getJobId());
-
-        // Cancel any active poll operations
-        if (activePollFutures != null) {
-            activePollFutures.forEach(f -> f.cancel(true));
-            activePollFutures.clear();
-            activePollFutures = null;
-        }
-
-        // Clean up all readers
         finishSplitRecords();
-
         if (tableSchemas != null) {
             tableSchemas.clear();
             tableSchemas = null;
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
index f576485c2f8..99ac1e0355b 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
@@ -79,7 +79,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -88,6 +87,7 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
@@ -123,11 +123,11 @@ public class MySqlSourceReader extends 
AbstractCdcSourceReader {
                     SnapshotReaderContext<
                             MySqlSnapshotSplit, SnapshotSplitReader, 
MySqlSnapshotSplitState>>
             snapshotReaderContexts;
-    private Set<String> completedSplitIds = new HashSet<>();
+    private Set<String> completedSplitIds = ConcurrentHashMap.newKeySet();
 
     // Parallel polling support
     private ExecutorService pollExecutor;
-    private List<CompletableFuture<PollResult>> activePollFutures;
+    private volatile List<CompletableFuture<PollResult>> activePollFutures;
 
     // Binlog reader (single reader for binlog split)
     private BinlogSplitReader binlogReader;
@@ -136,7 +136,7 @@ public class MySqlSourceReader extends 
AbstractCdcSourceReader {
 
     public MySqlSourceReader() {
         this.serializer = new MySqlDebeziumJsonDeserializer();
-        this.snapshotReaderContexts = new ArrayList<>();
+        this.snapshotReaderContexts = new CopyOnWriteArrayList<>();
     }
 
     @Override
@@ -341,7 +341,7 @@ public class MySqlSourceReader extends 
AbstractCdcSourceReader {
     }
 
     /** Prepare snapshot splits (unified handling for single or multiple 
splits) */
-    private SplitReadResult prepareSnapshotSplits(
+    private synchronized SplitReadResult prepareSnapshotSplits(
             List<MySqlSnapshotSplit> splits, JobBaseRecordRequest baseReq) 
throws Exception {
 
         LOG.info("Preparing {} snapshot split(s) for reading", splits.size());
@@ -429,7 +429,7 @@ public class MySqlSourceReader extends 
AbstractCdcSourceReader {
     }
 
     /** Prepare binlog split */
-    private SplitReadResult prepareBinlogSplit(
+    private synchronized SplitReadResult prepareBinlogSplit(
             Map<String, Object> offsetMeta, JobBaseRecordRequest baseReq) 
throws Exception {
         // Load tableSchemas from FE if available (avoids re-discover on 
restart)
         tryLoadTableSchemasFromRequest(baseReq);
@@ -527,7 +527,7 @@ public class MySqlSourceReader extends 
AbstractCdcSourceReader {
         LOG.info(
                 "Starting parallel polling for {} snapshot readers", 
snapshotReaderContexts.size());
 
-        activePollFutures = new ArrayList<>();
+        activePollFutures = new CopyOnWriteArrayList<>();
 
         for (int i = 0; i < snapshotReaderContexts.size(); i++) {
             final int index = i;
@@ -574,33 +574,30 @@ public class MySqlSourceReader extends 
AbstractCdcSourceReader {
      *     data
      */
     private PollResult waitForAnyCompletion() throws Exception {
-        while (!activePollFutures.isEmpty()) {
-            // Wait for any future to complete
+        List<CompletableFuture<PollResult>> snapshot = activePollFutures;
+        while (snapshot != null && !snapshot.isEmpty()) {
             CompletableFuture<Object> anyOf =
-                    CompletableFuture.anyOf(activePollFutures.toArray(new 
CompletableFuture[0]));
+                    CompletableFuture.anyOf(snapshot.toArray(new 
CompletableFuture[0]));
 
             anyOf.join(); // Wait for at least one to complete
 
             // Find and process completed futures
-            Iterator<CompletableFuture<PollResult>> iterator = 
activePollFutures.iterator();
-            while (iterator.hasNext()) {
-                CompletableFuture<PollResult> future = iterator.next();
-
+            for (CompletableFuture<PollResult> future : snapshot) {
                 if (future.isDone()) {
-                    iterator.remove(); // Remove from active list
+                    snapshot.remove(future);
                     PollResult result = future.get();
                     if (result != null) {
-                        // Found a reader with data, return immediately
                         LOG.info(
                                 "Got result from reader {}, {} futures 
remaining",
                                 result.context.getSplit().splitId(),
-                                activePollFutures.size());
+                                snapshot.size());
                         
completedSplitIds.add(result.context.getSplit().splitId());
                         return result;
                     }
                     // If result is null (no data), continue checking other 
futures
                 }
             }
+            snapshot = activePollFutures;
         }
         // All futures completed but none had data
         return null;
@@ -628,24 +625,30 @@ public class MySqlSourceReader extends 
AbstractCdcSourceReader {
 
     /** Poll records from binlog reader */
     private Iterator<SourceRecord> pollRecordsFromBinlogReader() throws 
InterruptedException {
+        BinlogSplitReader reader = binlogReader;
+        MySqlBinlogSplit split = binlogSplit;
+        MySqlBinlogSplitState state = binlogSplitState;
+        if (reader == null || split == null || state == null) {
+            LOG.info("Binlog reader is null at poll start, returning empty");
+            return Collections.emptyIterator();
+        }
 
-        Preconditions.checkState(binlogReader != null, "binlogReader is null");
-        Preconditions.checkNotNull(binlogSplitState, "binlogSplitState is 
null");
-
-        Iterator<SourceRecords> dataIt = binlogReader.pollSplitRecords();
+        Iterator<SourceRecords> dataIt = reader.pollSplitRecords();
         if (dataIt == null || !dataIt.hasNext()) {
+            if (binlogReader == null) {
+                LOG.info("Binlog reader is null after poll, returning empty");
+            }
             return Collections.emptyIterator();
         }
 
         SourceRecords sourceRecords = dataIt.next();
-        SplitRecords splitRecords =
-                new SplitRecords(binlogSplit.splitId(), 
sourceRecords.iterator());
+        SplitRecords splitRecords = new SplitRecords(split.splitId(), 
sourceRecords.iterator());
 
         if (!sourceRecords.getSourceRecordList().isEmpty()) {
             LOG.info("{} Records received from binlog", 
sourceRecords.getSourceRecordList().size());
         }
 
-        return new FilteredRecordIterator(splitRecords, binlogSplitState);
+        return new FilteredRecordIterator(splitRecords, state);
     }
 
     /**
@@ -1024,7 +1027,7 @@ public class MySqlSourceReader extends 
AbstractCdcSourceReader {
     }
 
     @Override
-    public void finishSplitRecords() {
+    public synchronized void finishSplitRecords() {
 
         // Cancel any active poll operations
         if (activePollFutures != null) {
@@ -1127,9 +1130,8 @@ public class MySqlSourceReader extends 
AbstractCdcSourceReader {
     }
 
     @Override
-    public void close(JobBaseConfig jobConfig) {
+    public synchronized void close(JobBaseConfig jobConfig) {
         LOG.info("Close source reader for job {}", jobConfig.getJobId());
-
         finishSplitRecords();
         if (tableSchemas != null) {
             tableSchemas.clear();
diff --git 
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_offset_earliest.out
 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_offset_earliest.out
new file mode 100644
index 00000000000..54d966bff91
--- /dev/null
+++ 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_offset_earliest.out
@@ -0,0 +1,9 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select_after_earliest_replay --
+1      alice
+2      bob
+
+-- !select_after_earliest_incr --
+1      alice_upd
+3      charlie
+
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_offset_earliest.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_offset_earliest.groovy
new file mode 100644
index 00000000000..f114e423633
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_offset_earliest.groovy
@@ -0,0 +1,130 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+// offset=earliest is only valid for MySQL. It skips the snapshot phase 
entirely and
+// replays binlog events from the oldest position available on the server. Two
+// invariants this case guards:
+//   1. Pre-existing rows whose INSERT events are still in binlog are picked 
up via
+//      the binlog path (no JDBC snapshot is run).
+//   2. Subsequent binlog DML (INSERT/UPDATE/DELETE) lands as usual.
+// The table name is randomized so binlog events from prior CI runs of this 
case
+// (with the same fixed name) cannot leak in and skew the final state — 
debezium
+// include_tables filtering is the only line of defense and it matches by name.
+suite("test_streaming_mysql_job_offset_earliest",
+        "p0,external,mysql,external_docker,external_docker_mysql,nondatalake") 
{
+    def jobName = "test_streaming_mysql_job_offset_earliest_name"
+    def currentDb = (sql "select database()")[0][0]
+    def suffix = UUID.randomUUID().toString().replace("-", "").substring(0, 8)
+    def table1 = "earliest_offset_mysql_tbl_${suffix}"
+    def mysqlDb = "test_cdc_db"
+
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    sql """drop table if exists ${currentDb}.${table1} force"""
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String mysql_port = context.config.otherConfigs.get("mysql_57_port");
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint = getS3Endpoint()
+        String bucket = getS3BucketName()
+        String driver_url = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar";
+
+        // Two INSERTs land in binlog BEFORE the job exists. earliest must 
replay them.
+        connect("root", "123456", 
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+            sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}"""
+            sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}"""
+            sql """CREATE TABLE ${mysqlDb}.${table1} (
+                  `id` int NOT NULL,
+                  `name` varchar(100) DEFAULT NULL,
+                  PRIMARY KEY (`id`)
+                ) ENGINE=InnoDB"""
+            sql """INSERT INTO ${mysqlDb}.${table1} VALUES (1, 'alice')"""
+            sql """INSERT INTO ${mysqlDb}.${table1} VALUES (2, 'bob')"""
+        }
+
+        sql """CREATE JOB ${jobName}
+                ON STREAMING
+                FROM MYSQL (
+                    "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}",
+                    "driver_url" = "${driver_url}",
+                    "driver_class" = "com.mysql.cj.jdbc.Driver",
+                    "user" = "root",
+                    "password" = "123456",
+                    "database" = "${mysqlDb}",
+                    "include_tables" = "${table1}",
+                    "offset" = "earliest"
+                )
+                TO DATABASE ${currentDb} (
+                  "table.create.properties.replication_num" = "1"
+                )
+            """
+
+        try {
+            Awaitility.await().atMost(300, SECONDS).pollInterval(2, 
SECONDS).until({
+                def cnt = sql """SELECT count(*) FROM ${currentDb}.${table1}"""
+                cnt.size() == 1 && cnt.get(0).get(0) == 2
+            })
+        } catch (Exception ex) {
+            def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job: " + showjob)
+            log.info("show task: " + showtask)
+            throw ex
+        }
+
+        qt_select_after_earliest_replay """ SELECT id, name FROM 
${currentDb}.${table1} ORDER BY id """
+
+        connect("root", "123456", 
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+            sql """INSERT INTO ${mysqlDb}.${table1} VALUES (3, 'charlie')"""
+            sql """UPDATE ${mysqlDb}.${table1} SET name='alice_upd' WHERE 
id=1"""
+            sql """DELETE FROM ${mysqlDb}.${table1} WHERE id=2"""
+        }
+
+        try {
+            Awaitility.await().atMost(180, SECONDS).pollInterval(2, 
SECONDS).until({
+                def cnt = sql """SELECT count(*) FROM ${currentDb}.${table1}"""
+                def upd = sql """SELECT name FROM ${currentDb}.${table1} WHERE 
id=1"""
+                def del = sql """SELECT count(*) FROM ${currentDb}.${table1} 
WHERE id=2"""
+                def updName = upd.size() == 0 ? null : upd.get(0).get(0)
+                log.info("incr cnt=${cnt} upd=${updName} del=${del}")
+                cnt.get(0).get(0) == 2 && updName == 'alice_upd' && 
del.get(0).get(0) == 0
+            })
+        } catch (Exception ex) {
+            def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job (incr): " + showjob)
+            log.info("show task (incr): " + showtask)
+            throw ex
+        }
+
+        qt_select_after_earliest_incr """ SELECT id, name FROM 
${currentDb}.${table1} ORDER BY id """
+
+        sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+        def jobCountRsp = sql """select count(1) from jobs("type"="insert") 
where Name ='${jobName}'"""
+        assert jobCountRsp.get(0).get(0) == 0
+
+        connect("root", "123456", 
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+            sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}"""
+        }
+    }
+}
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_drop_during_snapshot.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_drop_during_snapshot.groovy
new file mode 100644
index 00000000000..1856a5eb9f3
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_drop_during_snapshot.groovy
@@ -0,0 +1,148 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+// DROP JOB during the snapshot phase (chunks still being dispatched) must 
still
+// clean up auto-managed PG resources. test_streaming_postgres_job_publication 
only
+// covers DROP JOB after the job has reached a steady state — the in-flight 
drop
+// path goes through a different cancel/cleanup branch and historically leaks 
the
+// replication slot if cdc_client cancellation races with the FE-side resource 
drop.
+suite("test_streaming_postgres_job_drop_during_snapshot",
+        "p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+    def jobName = "test_streaming_pg_drop_during_snapshot_job"
+    def currentDb = (sql "select database()")[0][0]
+    def table1 = "drop_during_snapshot_pg_tbl"
+    def pgDB = "postgres"
+    def pgSchema = "cdc_test"
+    def pgUser = "postgres"
+    def pgPassword = "123456"
+    def totalRows = 300
+
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    sql """drop table if exists ${currentDb}.${table1} force"""
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String pg_port = context.config.otherConfigs.get("pg_14_port");
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint = getS3Endpoint()
+        String bucket = getS3BucketName()
+        String driver_url = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar";
+
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}"""
+            sql """CREATE TABLE ${pgDB}.${pgSchema}.${table1} (
+                  "id" varchar(20) PRIMARY KEY,
+                  "name" varchar(200)
+                )"""
+            StringBuilder sb = new StringBuilder()
+            sb.append("INSERT INTO ${pgDB}.${pgSchema}.${table1} (id, name) 
VALUES ")
+            for (int i = 1; i <= totalRows; i++) {
+                if (i > 1) sb.append(", ")
+                String key = "k_" + String.format("%05d", i)
+                sb.append("('${key}', 'name_${i}')")
+            }
+            sql sb.toString()
+        }
+
+        // Small split_size + single parallelism makes the snapshot slow 
enough that
+        // we can reliably catch it mid-flight before DROP.
+        sql """CREATE JOB ${jobName}
+                ON STREAMING
+                FROM POSTGRES (
+                    "jdbc_url" = 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+                    "driver_url" = "${driver_url}",
+                    "driver_class" = "org.postgresql.Driver",
+                    "user" = "${pgUser}",
+                    "password" = "${pgPassword}",
+                    "database" = "${pgDB}",
+                    "schema" = "${pgSchema}",
+                    "include_tables" = "${table1}",
+                    "offset" = "initial",
+                    "snapshot_split_size" = "5",
+                    "snapshot_parallelism" = "1"
+                )
+                TO DATABASE ${currentDb} (
+                  "table.create.properties.replication_num" = "1"
+                )
+            """
+
+        // Capture jobId for derived slot/publication names (matches the
+        // doris_cdc_${jobId} / doris_pub_${jobId} pattern in 
DataSourceConfigKeys).
+        def jobRow = sql """select Id from jobs("type"="insert") where 
Name='${jobName}'"""
+        assert jobRow.size() == 1 : "job did not register"
+        def jobId = jobRow.get(0).get(0).toString()
+        def expectedSlot = "doris_cdc_${jobId}"
+        def expectedPub = "doris_pub_${jobId}"
+
+        // Step 1: catch the job mid-snapshot. Confirm slot+publication 
actually exist
+        // on PG before issuing DROP — otherwise the cleanup assertion below 
is vacuous.
+        try {
+            Awaitility.await().atMost(180, SECONDS).pollInterval(1, 
SECONDS).until({
+                def succeed = sql """select SucceedTaskCount from 
jobs("type"="insert") where Name = '${jobName}'"""
+                def cnt = sql """SELECT COUNT(*) FROM ${currentDb}.${table1}"""
+                log.info("pre-drop succeed=${succeed} rows=${cnt}")
+                succeed.size() == 1 &&
+                        Integer.parseInt(succeed.get(0).get(0).toString()) >= 
2 &&
+                        cnt.get(0).get(0) < totalRows
+            })
+        } catch (Exception ex) {
+            log.info("job: " + (sql """select * from jobs("type"="insert") 
where Name='${jobName}'"""))
+            throw ex
+        }
+        def rowsBeforeDrop = sql("""SELECT COUNT(*) FROM 
${currentDb}.${table1}""").get(0).get(0) as int
+        assert rowsBeforeDrop < totalRows : "snapshot finished before we could 
DROP — case is moot"
+
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            def slotBefore = sql """SELECT COUNT(1) FROM pg_replication_slots 
WHERE slot_name = '${expectedSlot}'"""
+            def pubBefore = sql """SELECT COUNT(1) FROM pg_publication WHERE 
pubname = '${expectedPub}'"""
+            log.info("pre-drop pg: slot=${slotBefore} pub=${pubBefore}")
+            assert slotBefore[0][0] == 1 : "auto slot must exist before DROP — 
guard against fixture regression"
+            assert pubBefore[0][0] == 1 : "auto publication must exist before 
DROP"
+        }
+
+        // Step 2: DROP while snapshot tasks are still being dispatched.
+        sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+        // Step 3: slot and publication must both be cleaned up despite 
in-flight task
+        // cancellation. Polling avoids flakiness on slower environments where 
the
+        // FE drop-resources phase runs asynchronously with task cancel.
+        Awaitility.await().atMost(120, SECONDS).pollInterval(2, 
SECONDS).untilAsserted {
+            connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+                def slotAfter = sql """SELECT COUNT(1) FROM 
pg_replication_slots WHERE slot_name = '${expectedSlot}'"""
+                def pubAfter = sql """SELECT COUNT(1) FROM pg_publication 
WHERE pubname = '${expectedPub}'"""
+                log.info("post-drop pg: slot=${slotAfter} pub=${pubAfter}")
+                assert slotAfter[0][0] == 0 : "slot ${expectedSlot} leaked 
after DROP during snapshot"
+                assert pubAfter[0][0] == 0 : "publication ${expectedPub} 
leaked after DROP during snapshot"
+            }
+        }
+
+        // Step 4: job row must be gone on FE side.
+        def jobCount = sql """select count(1) from jobs("type"="insert") where 
Name = '${jobName}'"""
+        assert jobCount.get(0).get(0) == 0 : "job row not removed after DROP"
+
+        // Cleanup
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}"""
+        }
+        sql """drop table if exists ${currentDb}.${table1} force"""
+    }
+}
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_publication.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_publication.groovy
index 1d29b659a60..d38d07d8c4e 100644
--- 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_publication.groovy
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_publication.groovy
@@ -20,11 +20,15 @@ import org.awaitility.Awaitility
 
 import static java.util.concurrent.TimeUnit.SECONDS
 
-// Verify per-resource ownership of PG replication slot and publication:
+// Verify per-resource ownership of PG replication slot and publication, plus
+// the fail-fast validation paths in PostgresResourceValidator:
 //   Test 1: auto-generated slot & publication — created on job start, cleaned 
up on drop
 //   Test 2: user-provided slot & publication — Doris uses but never drops them
 //   Test 3: mixed (user publication + auto slot) — only auto slot is dropped 
on job deletion
 //   Test 4: slot_name / publication_name are immutable via ALTER JOB
+//   Test 5: user-provided slot_name that does not exist → actionable error
+//   Test 6: user-provided publication_name that does not exist → actionable 
error
+//   Test 7: user-provided publication exists but is missing required tables → 
actionable error
 suite("test_streaming_postgres_job_publication", 
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
     def jobName = "test_pg_pub_job"
     def currentDb = (sql "select database()")[0][0]
@@ -401,6 +405,124 @@ suite("test_streaming_postgres_job_publication", 
"p0,external,pg,external_docker
 
         sql """DROP JOB IF EXISTS where jobname = '${alterJob}'"""
 
+        // PostgresResourceValidator fail-fast paths: every CREATE JOB goes 
through
+        // the validator, and operators most often hit it via three 
misconfigurations
+        // below. Each error must be actionable (mention the missing resource 
by name
+        // and hint at the fix) so support can read the SQL error and move on.
+        def validatorJob = "test_pg_pub_validator_job"
+        sql """DROP JOB IF EXISTS where jobname = '${validatorJob}'"""
+        // Recreate the PG tables — Test 4 left them present but the alterJob 
is gone.
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}"""
+            sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table2}"""
+            sql """CREATE TABLE ${pgDB}.${pgSchema}.${table1} (
+                  "id" int PRIMARY KEY,
+                  "name" varchar(200)
+                )"""
+            sql """CREATE TABLE ${pgDB}.${pgSchema}.${table2} (
+                  "id" int PRIMARY KEY,
+                  "name" varchar(200)
+                )"""
+        }
+
+        // ========== Test 5: user-provided slot_name does not exist ==========
+        // Pre-create a valid publication so the only fail-fast path is the 
slot.
+        def goodPub5 = "test_pg_pub_validator_pub_5"
+        def missingSlot5 = "test_pg_pub_validator_missing_slot_5"
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """DROP PUBLICATION IF EXISTS ${goodPub5}"""
+            sql """CREATE PUBLICATION ${goodPub5} FOR TABLE 
${pgDB}.${pgSchema}.${table1}"""
+            def existing = sql """SELECT COUNT(1) FROM pg_replication_slots 
WHERE slot_name = '${missingSlot5}'"""
+            if (existing[0][0] != 0) {
+                sql """SELECT pg_drop_replication_slot('${missingSlot5}')"""
+            }
+        }
+        test {
+            sql """CREATE JOB ${validatorJob}
+                    ON STREAMING
+                    FROM POSTGRES (
+                        "jdbc_url" = 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+                        "driver_url" = "${driver_url}",
+                        "driver_class" = "org.postgresql.Driver",
+                        "user" = "${pgUser}",
+                        "password" = "${pgPassword}",
+                        "database" = "${pgDB}",
+                        "schema" = "${pgSchema}",
+                        "include_tables" = "${table1}",
+                        "slot_name" = "${missingSlot5}",
+                        "publication_name" = "${goodPub5}",
+                        "offset" = "initial"
+                    )
+                    TO DATABASE ${currentDb} (
+                      "table.create.properties.replication_num" = "1"
+                    )
+                """
+            exception "replication slot does not exist"
+        }
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """DROP PUBLICATION IF EXISTS ${goodPub5}"""
+        }
+
+        // ========== Test 6: user-provided publication_name does not exist 
==========
+        def missingPub6 = "test_pg_pub_validator_missing_pub_6"
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """DROP PUBLICATION IF EXISTS ${missingPub6}"""
+        }
+        test {
+            sql """CREATE JOB ${validatorJob}
+                    ON STREAMING
+                    FROM POSTGRES (
+                        "jdbc_url" = 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+                        "driver_url" = "${driver_url}",
+                        "driver_class" = "org.postgresql.Driver",
+                        "user" = "${pgUser}",
+                        "password" = "${pgPassword}",
+                        "database" = "${pgDB}",
+                        "schema" = "${pgSchema}",
+                        "include_tables" = "${table1}",
+                        "publication_name" = "${missingPub6}",
+                        "offset" = "initial"
+                    )
+                    TO DATABASE ${currentDb} (
+                      "table.create.properties.replication_num" = "1"
+                    )
+                """
+            exception "publication does not exist"
+        }
+
+        // ========== Test 7: user pub exists but does not include the 
requested tables ==========
+        // Publication covers ${table2} only, but the job asks for ${table1} → 
validator
+        // must reject with the actionable "ALTER PUBLICATION ... ADD TABLE" 
hint.
+        def partialPub7 = "test_pg_pub_validator_partial_pub_7"
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """DROP PUBLICATION IF EXISTS ${partialPub7}"""
+            sql """CREATE PUBLICATION ${partialPub7} FOR TABLE 
${pgDB}.${pgSchema}.${table2}"""
+        }
+        test {
+            sql """CREATE JOB ${validatorJob}
+                    ON STREAMING
+                    FROM POSTGRES (
+                        "jdbc_url" = 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+                        "driver_url" = "${driver_url}",
+                        "driver_class" = "org.postgresql.Driver",
+                        "user" = "${pgUser}",
+                        "password" = "${pgPassword}",
+                        "database" = "${pgDB}",
+                        "schema" = "${pgSchema}",
+                        "include_tables" = "${table1}",
+                        "publication_name" = "${partialPub7}",
+                        "offset" = "initial"
+                    )
+                    TO DATABASE ${currentDb} (
+                      "table.create.properties.replication_num" = "1"
+                    )
+                """
+            exception "missing required tables"
+        }
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """DROP PUBLICATION IF EXISTS ${partialPub7}"""
+        }
+
         // Cleanup PG tables
         connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
             sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}"""
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_slot_lsn_advance.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_slot_lsn_advance.groovy
new file mode 100644
index 00000000000..0ddf7cf94b2
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_slot_lsn_advance.groovy
@@ -0,0 +1,224 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+// pg_replication_slots.confirmed_flush_lsn must advance as cdc_client consumes
+// WAL, must NOT advance while the job is paused (after the in-flight task
+// finishes its final ack), and must resume advancing after RESUME. A stuck
+// confirmed_flush_lsn means WAL piles up on the source and eventually exhausts
+// disk — operationally this is the single most important health signal for a
+// long-running PG CDC job. See [[project_pgcdc_task_offset_stuck]] for the
+// customer-site bug class this guards against.
+//
+// PAUSE semantics (verified in code):
+//   PipelineCoordinator.writeRecords runs one maxInterval window per task,
+//   then commitSourceOffset (acks LSN to PG) and finishSplitRecords (closes
+//   the replication connection). PAUSE only stops the FE from scheduling the
+//   next task — the in-flight task runs to its natural end and acks LSN one
+//   last time. After that final ack, no consumer exists and no further LSN
+//   advancement is possible regardless of WAL growth on the source.
+// Hence the test waits for LSN to settle after PAUSE before asserting it stays
+// frozen. max_interval=3 keeps the in-flight task short so the settle window
+// stays under 30s.
+//
+// Uses a user-provided slot/publication for two reasons: (1) the slot name is
+// known up front so we don't have to fish jobId out of the jobs() view, and
+// (2) DROP JOB then leaves the slot intact, so post-test cleanup is explicit.
+suite("test_streaming_postgres_job_slot_lsn_advance",
+        "p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+    def jobName = "test_streaming_pg_slot_lsn_advance_job"
+    def currentDb = (sql "select database()")[0][0]
+    def table1 = "slot_lsn_advance_pg_tbl"
+    def pgDB = "postgres"
+    def pgSchema = "cdc_test"
+    def pgUser = "postgres"
+    def pgPassword = "123456"
+    def userSlot = "slot_lsn_advance_user_slot"
+    def userPub = "slot_lsn_advance_user_pub"
+
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    sql """drop table if exists ${currentDb}.${table1} force"""
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String pg_port = context.config.otherConfigs.get("pg_14_port");
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint = getS3Endpoint()
+        String bucket = getS3BucketName()
+        String driver_url = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar";
+
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}"""
+            sql """CREATE TABLE ${pgDB}.${pgSchema}.${table1} (
+                  "id" int PRIMARY KEY,
+                  "name" varchar(200)
+                )"""
+            sql """DROP PUBLICATION IF EXISTS ${userPub}"""
+            sql """CREATE PUBLICATION ${userPub} FOR TABLE 
${pgDB}.${pgSchema}.${table1}"""
+            def existing = sql """SELECT COUNT(1) FROM pg_replication_slots 
WHERE slot_name = '${userSlot}'"""
+            if (existing[0][0] != 0) {
+                sql """SELECT pg_drop_replication_slot('${userSlot}')"""
+            }
+            sql """SELECT pg_create_logical_replication_slot('${userSlot}', 
'pgoutput')"""
+        }
+
+        // offset=latest skips snapshot and goes straight to streaming — that 
is
+        // where confirmed_flush_lsn is meaningful. max_interval=3 keeps each
+        // task short so PAUSE settles fast.
+        sql """CREATE JOB ${jobName}
+                PROPERTIES ("max_interval" = "3")
+                ON STREAMING
+                FROM POSTGRES (
+                    "jdbc_url" = 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+                    "driver_url" = "${driver_url}",
+                    "driver_class" = "org.postgresql.Driver",
+                    "user" = "${pgUser}",
+                    "password" = "${pgPassword}",
+                    "database" = "${pgDB}",
+                    "schema" = "${pgSchema}",
+                    "include_tables" = "${table1}",
+                    "slot_name" = "${userSlot}",
+                    "publication_name" = "${userPub}",
+                    "offset" = "latest"
+                )
+                TO DATABASE ${currentDb} (
+                  "table.create.properties.replication_num" = "1"
+                )
+            """
+
+        Awaitility.await().atMost(120, SECONDS).pollInterval(1, 
SECONDS).until({
+            def st = sql """select status from jobs("type"="insert") where 
Name='${jobName}'"""
+            st.size() == 1 && st.get(0).get(0) == "RUNNING"
+        })
+
+        // Helper to read confirmed_flush_lsn as BigInteger (Long would 
overflow
+        // on production-scale wraparound; BigInteger is safe).
+        Closure<BigInteger> readLsn = {
+            BigInteger out = BigInteger.valueOf(-1L)
+            connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+                def r = sql """SELECT confirmed_flush_lsn::text FROM 
pg_replication_slots WHERE slot_name = '${userSlot}'"""
+                if (r.size() == 1 && r.get(0).get(0) != null) {
+                    def parts = r.get(0).get(0).toString().split("/")
+                    out = new BigInteger(parts[0], 16).shiftLeft(32).add(new 
BigInteger(parts[1], 16))
+                }
+            }
+            out
+        }
+
+        def lsn0 = readLsn()
+        log.info("initial confirmed_flush_lsn=${lsn0}")
+        assert lsn0.signum() > 0 : "user slot ${userSlot} confirmed_flush_lsn 
is null/invalid"
+
+        // ===== Phase 1: steady-state advancement =====
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            for (int i = 1; i <= 20; i++) {
+                sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (${i}, 
'name_${i}')"""
+            }
+        }
+        Awaitility.await().atMost(180, SECONDS).pollInterval(2, 
SECONDS).until({
+            def cnt = sql """SELECT count(*) FROM ${currentDb}.${table1}"""
+            cnt.size() == 1 && cnt.get(0).get(0) >= 20
+        })
+        Awaitility.await().atMost(120, SECONDS).pollInterval(2, 
SECONDS).until({
+            def cur = readLsn()
+            log.info("phase1 poll lsn=${cur} (init=${lsn0})")
+            cur > lsn0
+        })
+
+        // ===== Phase 2: PAUSE — in-flight task acks once then LSN must 
freeze =====
+        sql """PAUSE JOB where jobname = '${jobName}'"""
+        Awaitility.await().atMost(30, SECONDS).pollInterval(1, SECONDS).until({
+            def st = sql """select status from jobs("type"="insert") where 
Name='${jobName}'"""
+            st.size() == 1 && st.get(0).get(0) == "PAUSED"
+        })
+
+        // Wait for LSN to settle: N consecutive equal samples confirm the 
in-flight
+        // task has completed its final commitSourceOffset and the connection 
is closed.
+        BigInteger lastLsn = BigInteger.valueOf(-1L)
+        int stable = 0
+        final int requiredStable = 4
+        Awaitility.await().atMost(120, SECONDS).pollInterval(4, 
SECONDS).until({
+            BigInteger cur = readLsn()
+            if (cur == lastLsn) {
+                stable++
+            } else {
+                stable = 1
+                lastLsn = cur
+            }
+            log.info("pause-settle lsn=${cur} 
stable=${stable}/${requiredStable}")
+            stable >= requiredStable
+        })
+        def lsnAtPauseSettled = lastLsn
+
+        // Generate WAL while paused. With no consumer the slot's
+        // confirmed_flush_lsn must remain frozen even though WAL is growing.
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            for (int i = 100; i < 120; i++) {
+                sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (${i}, 
'paused_${i}')"""
+            }
+        }
+        sleep(15000)
+        def lsnDuringPause = readLsn()
+        log.info("paused: settled=${lsnAtPauseSettled} 
duringPause=${lsnDuringPause}")
+        assert lsnDuringPause == lsnAtPauseSettled :
+                "confirmed_flush_lsn advanced while paused with no consumer: " 
+
+                "${lsnAtPauseSettled} -> ${lsnDuringPause}"
+
+        // ===== Phase 3: RESUME — LSN must advance past the paused snapshot 
=====
+        sql """RESUME JOB where jobname = '${jobName}'"""
+        Awaitility.await().atMost(60, SECONDS).pollInterval(1, SECONDS).until({
+            def st = sql """select status from jobs("type"="insert") where 
Name='${jobName}'"""
+            st.size() == 1 && st.get(0).get(0) == "RUNNING"
+        })
+        Awaitility.await().atMost(180, SECONDS).pollInterval(2, 
SECONDS).until({
+            def cnt = sql """SELECT count(*) FROM ${currentDb}.${table1}"""
+            cnt.size() == 1 && cnt.get(0).get(0) >= 40
+        })
+        Awaitility.await().atMost(180, SECONDS).pollInterval(2, 
SECONDS).until({
+            def cur = readLsn()
+            log.info("phase3 poll lsn=${cur} (pause=${lsnAtPauseSettled})")
+            cur > lsnAtPauseSettled
+        })
+
+        sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+        // User-provided slot/pub survive DROP JOB; clean up manually. After 
DROP JOB
+        // the cdc_client may still be winding down its replication connection 
— PG
+        // rejects pg_drop_replication_slot on an active slot, so poll 
active=false
+        // before issuing the drop.
+        Awaitility.await().atMost(60, SECONDS).pollInterval(1, SECONDS).until({
+            boolean inactive = false
+            connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+                def r = sql """SELECT active FROM pg_replication_slots WHERE 
slot_name = '${userSlot}'"""
+                inactive = r.size() == 1 && r.get(0).get(0) == false
+            }
+            inactive
+        })
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            def slotStillThere = sql """SELECT COUNT(1) FROM 
pg_replication_slots WHERE slot_name = '${userSlot}'"""
+            assert slotStillThere[0][0] == 1 : "user-provided slot must not be 
dropped by Doris"
+            sql """SELECT pg_drop_replication_slot('${userSlot}')"""
+            sql """DROP PUBLICATION IF EXISTS ${userPub}"""
+            sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}"""
+        }
+        sql """drop table if exists ${currentDb}.${table1} force"""
+    }
+}
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_special_offset_restart_fe.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_special_offset_restart_fe.groovy
new file mode 100644
index 00000000000..802e4503890
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_special_offset_restart_fe.groovy
@@ -0,0 +1,188 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+// Mirror of test_streaming_mysql_job_special_offset_restart_fe for the PG 
path:
+// CREATE JOB with a JSON LSN offset, sync, restart FE, verify currentOffset
+// survives the replay and subsequent binlog DML still lands.
+//
+// PG-specific wrinkle: an auto-managed slot starts retaining WAL only at slot
+// creation time, so a CREATE-with-past-LSN against an auto slot would fail
+// because PG has already purged the requested LSN. We therefore pre-create a
+// user-provided slot first — that pins the WAL retention horizon back in time
+// far enough to make the LSN we capture valid.
+suite("test_streaming_postgres_job_special_offset_restart_fe",
+        "docker,pg,external_docker,external_docker_pg,nondatalake") {
+    def jobName = "test_streaming_pg_special_offset_restart_fe"
+    def options = new ClusterOptions()
+    options.setFeNum(1)
+    // TODO: remove once cleanMeta targets the BE that owns the reader (binlog
+    // phase pinned to a single BE). Today selectBackend() round-robins across
+    // BEs, so /api/close can land on a BE that doesn't hold the PG replication
+    // connection — the real holder is never notified and the slot stays 
active.
+    options.setBeNum(1)
+    options.cloudMode = null
+
+    docker(options) {
+        def currentDb = (sql "select database()")[0][0]
+        def table1 = "special_offset_restart_pg_tbl"
+        def pgDB = "postgres"
+        def pgSchema = "cdc_test"
+        def pgUser = "postgres"
+        def pgPassword = "123456"
+        def userSlot = "special_offset_restart_slot"
+        def userPub = "special_offset_restart_pub"
+
+        sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+        sql """drop table if exists ${currentDb}.${table1} force"""
+
+        String enabled = context.config.otherConfigs.get("enableJdbcTest")
+        if (enabled != null && enabled.equalsIgnoreCase("true")) {
+            String pg_port = context.config.otherConfigs.get("pg_14_port");
+            String externalEnvIp = 
context.config.otherConfigs.get("externalEnvIp")
+            String s3_endpoint = getS3Endpoint()
+            String bucket = getS3BucketName()
+            String driver_url = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar";
+
+            // Setup: fresh PG table + fresh user slot/pub. Slot must be 
created
+            // BEFORE the LSN we capture below, otherwise PG would have purged
+            // the WAL covering that LSN by the time the job tries to replay 
it.
+            def lsnAtCreate = ""
+            connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+                sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}"""
+                sql """CREATE TABLE ${pgDB}.${pgSchema}.${table1} (
+                      "id" int PRIMARY KEY,
+                      "name" varchar(100)
+                    )"""
+                sql """DROP PUBLICATION IF EXISTS ${userPub}"""
+                sql """CREATE PUBLICATION ${userPub} FOR TABLE 
${pgDB}.${pgSchema}.${table1}"""
+                def existing = sql """SELECT COUNT(1) FROM 
pg_replication_slots WHERE slot_name = '${userSlot}'"""
+                if (existing[0][0] != 0) {
+                    sql """SELECT pg_drop_replication_slot('${userSlot}')"""
+                }
+                sql """SELECT 
pg_create_logical_replication_slot('${userSlot}', 'pgoutput')"""
+
+                // Capture LSN AFTER slot creation, BEFORE the INSERTs the job 
will read.
+                def lsnRows = sql """SELECT pg_current_wal_lsn()::text"""
+                def lsnStr = lsnRows[0][0].toString()
+                def parts = lsnStr.split("/")
+                lsnAtCreate = new BigInteger(parts[0], 16).shiftLeft(32)
+                        .add(new BigInteger(parts[1], 16)).toString()
+                log.info("CREATE LSN mark: ${lsnStr} -> numeric: 
${lsnAtCreate}")
+
+                // Inserts after the mark: these are what the job should 
stream.
+                sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (1, 
'alice')"""
+                sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (2, 
'bob')"""
+            }
+
+            def offsetJson = """{"lsn":"${lsnAtCreate}"}"""
+            log.info("Creating job with LSN offset: ${offsetJson}")
+            sql """CREATE JOB ${jobName}
+                    ON STREAMING
+                    FROM POSTGRES (
+                        "jdbc_url" = 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+                        "driver_url" = "${driver_url}",
+                        "driver_class" = "org.postgresql.Driver",
+                        "user" = "${pgUser}",
+                        "password" = "${pgPassword}",
+                        "database" = "${pgDB}",
+                        "schema" = "${pgSchema}",
+                        "include_tables" = "${table1}",
+                        "slot_name" = "${userSlot}",
+                        "publication_name" = "${userPub}",
+                        "offset" = '${offsetJson}'
+                    )
+                    TO DATABASE ${currentDb} (
+                      "table.create.properties.replication_num" = "1"
+                    )
+                """
+
+            try {
+                Awaitility.await().atMost(300, SECONDS).pollInterval(2, 
SECONDS).until({
+                    def succeed = sql """select SucceedTaskCount from 
jobs("type"="insert") where Name='${jobName}' and ExecuteType='STREAMING'"""
+                    def cnt = sql """SELECT count(*) FROM 
${currentDb}.${table1}"""
+                    log.info("pre-restart succeed=${succeed} rows=${cnt}")
+                    succeed.size() == 1 &&
+                            (succeed.get(0).get(0) as int) >= 1 &&
+                            cnt.size() == 1 && cnt.get(0).get(0) == 2
+                })
+            } catch (Exception ex) {
+                def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+                def showtask = sql """select * from tasks("type"="insert") 
where JobName='${jobName}'"""
+                log.info("show job: " + showjob)
+                log.info("show task: " + showtask)
+                throw ex;
+            }
+
+            def jobInfoBefore = sql """
+                select loadStatistic, status, currentOffset from 
jobs("type"="insert") where Name='${jobName}'
+            """
+            log.info("jobInfoBefore: " + jobInfoBefore)
+            assert jobInfoBefore.get(0).get(1) == "RUNNING"
+
+            // Restart FE — currentOffset must replay cleanly from BDBJE 
editlog + txn attachments.
+            cluster.restartFrontends()
+            sleep(60000)
+            context.reconnectFe()
+
+            def jobInfoAfter = sql """
+                select loadStatistic, status, currentOffset from 
jobs("type"="insert") where Name='${jobName}'
+            """
+            log.info("jobInfoAfter: " + jobInfoAfter)
+            assert jobInfoAfter.get(0).get(1) == "RUNNING"
+            assert jobInfoAfter.get(0).get(2) == jobInfoBefore.get(0).get(2) :
+                    "currentOffset diverged after restart: 
before=${jobInfoBefore.get(0).get(2)} after=${jobInfoAfter.get(0).get(2)}"
+
+            // Post-restart binlog still lands.
+            connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+                sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (3, 
'charlie')"""
+            }
+            Awaitility.await().atMost(300, SECONDS).pollInterval(2, 
SECONDS).until({
+                def result = sql """SELECT count(*) FROM 
${currentDb}.${table1}"""
+                result[0][0] >= 3
+            })
+
+            sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+            sql """drop table if exists ${currentDb}.${table1} force"""
+
+            // User-provided slot/pub must survive DROP JOB; clean up manually.
+            // After DROP JOB the cdc_client may still be winding down its 
replication
+            // connection — PG rejects pg_drop_replication_slot on an active 
slot, so
+            // poll active=false before issuing the drop.
+            Awaitility.await().atMost(60, SECONDS).pollInterval(1, 
SECONDS).until({
+                boolean inactive = false
+                connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+                    def r = sql """SELECT active FROM pg_replication_slots 
WHERE slot_name = '${userSlot}'"""
+                    inactive = r.size() == 1 && r.get(0).get(0) == false
+                }
+                inactive
+            })
+            connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+                def slotStillThere = sql """SELECT COUNT(1) FROM 
pg_replication_slots WHERE slot_name = '${userSlot}'"""
+                assert slotStillThere[0][0] == 1 :
+                        "user-provided slot ${userSlot} must not be dropped by 
Doris"
+                sql """SELECT pg_drop_replication_slot('${userSlot}')"""
+                sql """DROP PUBLICATION IF EXISTS ${userPub}"""
+                sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}"""
+            }
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to