This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new fe860299e7c branch-4.1: [Feature](streamjob) Streaming job  support 
cdc_stream TVF  #61826 (#61970)
fe860299e7c is described below

commit fe860299e7c4e514f6d61661662732413551c448
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Apr 1 13:30:16 2026 +0800

    branch-4.1: [Feature](streamjob) Streaming job  support cdc_stream TVF  
#61826 (#61970)
    
    Cherry-picked from #61826
    
    Co-authored-by: wudi <[email protected]>
---
 .../apache/doris/job/cdc/DataSourceConfigKeys.java |   1 +
 .../insert/streaming/AbstractStreamingTask.java    |  11 +
 .../insert/streaming/StreamingInsertJob.java       |  50 ++-
 .../insert/streaming/StreamingInsertTask.java      |  11 +-
 .../insert/streaming/StreamingMultiTblTask.java    |   2 +
 .../doris/job/offset/SourceOffsetProvider.java     |  42 +-
 .../job/offset/SourceOffsetProviderFactory.java    |   2 +
 .../job/offset/jdbc/JdbcSourceOffsetProvider.java  |  42 +-
 .../offset/jdbc/JdbcTvfSourceOffsetProvider.java   | 425 +++++++++++++++++++++
 .../job/offset/s3/S3SourceOffsetProvider.java      |   3 +-
 .../trees/plans/commands/AlterJobCommand.java      |  16 +
 .../java/org/apache/doris/planner/ScanNode.java    |   4 +
 .../main/java/org/apache/doris/qe/Coordinator.java |  12 +
 .../CdcStreamTableValuedFunction.java              |  35 +-
 .../cdcclient/controller/ClientController.java     |   2 +-
 .../cdcclient/service/PipelineCoordinator.java     |  95 +++--
 .../source/reader/mysql/MySqlSourceReader.java     |  17 +
 .../reader/postgres/PostgresSourceReader.java      |   4 +
 .../src/main/resources/application.properties      |   3 +-
 .../tvf/test_streaming_job_cdc_stream_mysql.out    |  11 +
 .../tvf/test_streaming_job_cdc_stream_postgres.out |  11 +
 ...g_job_cdc_stream_postgres_latest_alter_cred.out |  11 +
 ...eaming_job_cdc_stream_postgres_pause_resume.out |  13 +
 .../tvf/test_streaming_job_cdc_stream_mysql.groovy | 128 +++++++
 .../test_streaming_job_cdc_stream_postgres.groovy  | 131 +++++++
 ...ob_cdc_stream_postgres_latest_alter_cred.groovy | 249 ++++++++++++
 ...ing_job_cdc_stream_postgres_pause_resume.groovy | 238 ++++++++++++
 27 files changed, 1497 insertions(+), 72 deletions(-)

diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
index 9d2d5034eab..d1efc0b4c88 100644
--- 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
+++ 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
@@ -35,6 +35,7 @@ public class DataSourceConfigKeys {
     public static final String OFFSET_LATEST = "latest";
     public static final String OFFSET_SNAPSHOT = "snapshot";
     public static final String SNAPSHOT_SPLIT_SIZE = "snapshot_split_size";
+    public static final String SNAPSHOT_SPLIT_KEY = "snapshot_split_key";
     public static final String SNAPSHOT_PARALLELISM = "snapshot_parallelism";
     public static final String SNAPSHOT_PARALLELISM_DEFAULT = "1";
     public static final String SSL_MODE = "ssl_mode";
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/AbstractStreamingTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/AbstractStreamingTask.java
index 2618d8a122c..25da9ee90a6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/AbstractStreamingTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/AbstractStreamingTask.java
@@ -33,6 +33,8 @@ import lombok.Setter;
 import lombok.extern.log4j.Log4j2;
 import org.apache.commons.lang3.StringUtils;
 
+import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 @Log4j2
@@ -68,6 +70,15 @@ public abstract class AbstractStreamingTask {
 
     public abstract void run() throws JobException;
 
+    /**
+     * Returns the IDs of backends that ran the scan node for this task.
+     * Subclasses backed by a TVF query (e.g. StreamingInsertTask) override 
this
+     * to return the actual scan backend IDs from the coordinator.
+     */
+    public List<Long> getScanBackendIds() {
+        return Collections.emptyList();
+    }
+
     public abstract boolean onSuccess() throws JobException;
 
     public abstract void closeOrReleaseResources();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index c999b9d99e3..cc908a5aa1c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -51,6 +51,7 @@ import org.apache.doris.job.offset.Offset;
 import org.apache.doris.job.offset.SourceOffsetProvider;
 import org.apache.doris.job.offset.SourceOffsetProviderFactory;
 import org.apache.doris.job.offset.jdbc.JdbcSourceOffsetProvider;
+import org.apache.doris.job.offset.jdbc.JdbcTvfSourceOffsetProvider;
 import org.apache.doris.job.util.StreamingJobUtils;
 import org.apache.doris.load.loadv2.LoadJob;
 import org.apache.doris.load.loadv2.LoadStatistic;
@@ -72,6 +73,7 @@ import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.ShowResultSetMetaData;
 import org.apache.doris.rpc.RpcException;
 import org.apache.doris.service.FrontendOptions;
+import org.apache.doris.tablefunction.S3TableValuedFunction;
 import org.apache.doris.thrift.TCell;
 import org.apache.doris.thrift.TRow;
 import org.apache.doris.transaction.TransactionException;
@@ -300,8 +302,11 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
             this.tvfType = currentTvf.getFunctionName();
             this.originTvfProps = currentTvf.getProperties().getMap();
             this.offsetProvider = 
SourceOffsetProviderFactory.createSourceOffsetProvider(currentTvf.getFunctionName());
-            // validate offset props
-            if (jobProperties.getOffsetProperty() != null) {
+            this.offsetProvider.ensureInitialized(getJobId(), originTvfProps);
+            this.offsetProvider.initOnCreate();
+            // validate offset props, only for s3 cause s3 tvf no offset prop
+            if (jobProperties.getOffsetProperty() != null
+                    && S3TableValuedFunction.NAME.equalsIgnoreCase(tvfType)) {
                 Offset offset = 
validateOffset(jobProperties.getOffsetProperty());
                 this.offsetProvider.updateOffset(offset);
             }
@@ -536,6 +541,8 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
                 if (originTvfProps == null) {
                     this.originTvfProps = 
getCurrentTvf().getProperties().getMap();
                 }
+                // when fe restart, offsetProvider.jobId may be null
+                offsetProvider.ensureInitialized(getJobId(), originTvfProps);
                 offsetProvider.fetchRemoteMeta(originTvfProps);
             } else {
                 offsetProvider.fetchRemoteMeta(new HashMap<>());
@@ -653,7 +660,14 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         this.jobStatistic.setLoadBytes(this.jobStatistic.getLoadBytes() + 
attachment.getLoadBytes());
         this.jobStatistic.setFileNumber(this.jobStatistic.getFileNumber() + 
attachment.getNumFiles());
         this.jobStatistic.setFileSize(this.jobStatistic.getFileSize() + 
attachment.getFileBytes());
-        
offsetProvider.updateOffset(offsetProvider.deserializeOffset(attachment.getOffset()));
+        Offset endOffset = 
offsetProvider.deserializeOffset(attachment.getOffset());
+        offsetProvider.updateOffset(endOffset);
+        if (!isReplay) {
+            offsetProvider.onTaskCommitted(attachment.getScannedRows(), 
attachment.getLoadBytes());
+            if (runningStreamTask != null) {
+                
offsetProvider.applyEndOffsetToTask(runningStreamTask.getRunningOffset(), 
endOffset);
+            }
+        }
 
         //update metric
         if (MetricRepo.isInit && !isReplay) {
@@ -757,7 +771,8 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
      */
     private void modifyPropertiesInternal(Map<String, String> inputProperties) 
throws AnalysisException, JobException {
         StreamingJobProperties inputStreamProps = new 
StreamingJobProperties(inputProperties);
-        if (StringUtils.isNotEmpty(inputStreamProps.getOffsetProperty()) && 
this.tvfType != null) {
+        if (StringUtils.isNotEmpty(inputStreamProps.getOffsetProperty())
+                && S3TableValuedFunction.NAME.equalsIgnoreCase(this.tvfType)) {
             Offset offset = 
validateOffset(inputStreamProps.getOffsetProperty());
             this.offsetProvider.updateOffset(offset);
             if (Config.isCloudMode()) {
@@ -1016,6 +1031,17 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
             }
             LoadJob loadJob = loadJobs.get(0);
             LoadStatistic loadStatistic = loadJob.getLoadStatistic();
+
+            String offsetJson = offsetProvider.getCommitOffsetJson(
+                    runningStreamTask.getRunningOffset(),
+                    runningStreamTask.getTaskId(),
+                    runningStreamTask.getScanBackendIds());
+
+
+            if (StringUtils.isBlank(offsetJson)) {
+                throw new TransactionException("Cannot find offset for 
attachment, load job id is "
+                        + runningStreamTask.getTaskId());
+            }
             txnState.setTxnCommitAttachment(new 
StreamingTaskTxnCommitAttachment(
                         getJobId(),
                         runningStreamTask.getTaskId(),
@@ -1023,7 +1049,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
                         loadStatistic.getLoadBytes(),
                         loadStatistic.getFileNumber(),
                         loadStatistic.getTotalFileSizeB(),
-                        
runningStreamTask.getRunningOffset().toSerializedJson()));
+                        offsetJson));
         } finally {
             if (shouldReleaseLock) {
                 writeUnlock();
@@ -1186,10 +1212,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
                 }
                 checkDataQuality(offsetRequest);
                 updateNoTxnJobStatisticAndOffset(offsetRequest);
-                if (offsetRequest.getScannedRows() == 0 && 
offsetRequest.getLoadBytes() == 0) {
-                    JdbcSourceOffsetProvider op = (JdbcSourceOffsetProvider) 
offsetProvider;
-                    op.setHasMoreData(false);
-                }
+                offsetProvider.onTaskCommitted(offsetRequest.getScannedRows(), 
offsetRequest.getLoadBytes());
                 if (offsetRequest.getTableSchemas() != null) {
                     JdbcSourceOffsetProvider op = (JdbcSourceOffsetProvider) 
offsetProvider;
                     op.setTableSchemas(offsetRequest.getTableSchemas());
@@ -1278,6 +1301,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
      */
     public void cleanup() throws JobException {
         log.info("cleanup streaming job {}", getJobId());
+
         // s3 tvf clean offset
         if (tvfType != null && Config.isCloudMode()) {
             Cloud.DeleteStreamingJobResponse resp = null;
@@ -1298,6 +1322,14 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
             }
         }
 
+        // For TVF path, provider fields may be null after FE restart
+        if (this.offsetProvider instanceof JdbcTvfSourceOffsetProvider) {
+            if (originTvfProps == null) {
+                this.originTvfProps = getCurrentTvf().getProperties().getMap();
+            }
+            offsetProvider.ensureInitialized(getJobId(), originTvfProps);
+        }
+
         if (this.offsetProvider instanceof JdbcSourceOffsetProvider) {
             // jdbc clean chunk meta table
             ((JdbcSourceOffsetProvider) 
this.offsetProvider).cleanMeta(getJobId());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
index 635c639256c..54832eb6fb1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
@@ -43,6 +43,7 @@ import lombok.Getter;
 import lombok.extern.log4j.Log4j2;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -98,7 +99,7 @@ public class StreamingInsertTask extends 
AbstractStreamingTask {
         if (!baseCommand.getParsedPlan().isPresent()) {
             throw new JobException("Can not get Parsed plan");
         }
-        this.taskCommand = offsetProvider.rewriteTvfParams(baseCommand, 
runningOffset);
+        this.taskCommand = offsetProvider.rewriteTvfParams(baseCommand, 
runningOffset, getTaskId());
         this.taskCommand.setLabelName(Optional.of(labelName));
         this.stmtExecutor = new StmtExecutor(ctx, new 
LogicalPlanAdapter(taskCommand, ctx.getStatementContext()));
     }
@@ -126,6 +127,14 @@ public class StreamingInsertTask extends 
AbstractStreamingTask {
         }
     }
 
+    @Override
+    public List<Long> getScanBackendIds() {
+        if (stmtExecutor != null && stmtExecutor.getCoord() != null) {
+            return stmtExecutor.getCoord().getScanBackendIds();
+        }
+        return Collections.emptyList();
+    }
+
     @Override
     public boolean onSuccess() throws JobException {
         if (getIsCanceled().get()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
index 2c9fbf6fe17..c35c0a27ac2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
@@ -192,6 +192,8 @@ public class StreamingMultiTblTask extends 
AbstractStreamingTask {
         Map<String, String> props = generateStreamLoadProps();
         request.setStreamLoadProps(props);
 
+        //`meta` refers to the data synchronized by the job in this instance,
+        // while `sourceProperties.offset` is the data entered by the user.
         Map<String, Object> splitMeta = offset.generateMeta();
         Preconditions.checkArgument(!splitMeta.isEmpty(), "split meta is 
empty");
         request.setMeta(splitMeta);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
index 16fb2394fe3..33a24378704 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
@@ -22,6 +22,7 @@ import 
org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
 import org.apache.doris.job.extensions.insert.streaming.StreamingJobProperties;
 import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
 
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -36,6 +37,20 @@ public interface SourceOffsetProvider {
      */
     String getSourceType();
 
+    /**
+     * Initialize the offset provider with job ID and original TVF properties.
+     * Only sets in-memory fields; safe to call on both fresh start and FE 
restart.
+     * May perform remote calls (e.g. fetching snapshot splits), so throws 
JobException.
+     */
+    default void ensureInitialized(Long jobId, Map<String, String> 
originTvfProps) throws JobException {}
+
+    /**
+     * Performs one-time initialization that must run only on fresh job 
creation, not on FE restart.
+     * For example, fetching and persisting snapshot splits to the meta table.
+     * Default: no-op (most providers need no extra setup).
+     */
+    default void initOnCreate() throws JobException {}
+
     /**
      * Get next offset to consume
      *
@@ -59,11 +74,12 @@ public interface SourceOffsetProvider {
 
     /**
      * Rewrite the TVF parameters in the SQL based on the current offset.
+     * Only implemented by TVF-based providers (e.g. S3, cdc_stream).
      *
      * @param nextOffset
      * @return rewritten InsertIntoTableCommand
      */
-    InsertIntoTableCommand rewriteTvfParams(InsertIntoTableCommand 
originCommand, Offset nextOffset);
+    InsertIntoTableCommand rewriteTvfParams(InsertIntoTableCommand 
originCommand, Offset nextOffset, long taskId);
 
     /**
      * Update the offset of the source.
@@ -110,6 +126,30 @@ public interface SourceOffsetProvider {
         return null;
     }
 
+    /**
+     * Returns the serialized JSON offset to store in txn commit attachment.
+     * Default: serialize running offset directly (e.g. S3 path).
+     * CDC stream TVF overrides to pull actual end offset from BE after 
fetchRecordStream completes.
+     * scanBackendIds: IDs of the BEs that ran the TVF scan node, used to 
locate taskOffsetCache.
+     */
+    default String getCommitOffsetJson(Offset runningOffset, long taskId, 
List<Long> scanBackendIds) {
+        return runningOffset.toSerializedJson();
+    }
+
+    /**
+     * Called after each task is committed. Providers that track data 
availability
+     * (e.g. JDBC binlog) can use this to update internal state such as 
hasMoreData.
+     * Default: no-op.
+     */
+    default void onTaskCommitted(long scannedRows, long loadBytes) {}
+
+    /**
+     * Applies the end offset from a committed task back onto the running 
offset object
+     * in-place, so that showRange() can display the full [start, end] 
interval.
+     * Default: no-op (only meaningful for JDBC providers).
+     */
+    default void applyEndOffsetToTask(Offset runningOffset, Offset endOffset) 
{}
+
     /**
      * Returns true if the provider has reached a natural completion point
      * and the job should be marked as FINISHED.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProviderFactory.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProviderFactory.java
index adc49249eb1..30f9d0edd57 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProviderFactory.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProviderFactory.java
@@ -18,6 +18,7 @@
 package org.apache.doris.job.offset;
 
 import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.offset.jdbc.JdbcTvfSourceOffsetProvider;
 import org.apache.doris.job.offset.s3.S3SourceOffsetProvider;
 
 import lombok.extern.log4j.Log4j2;
@@ -31,6 +32,7 @@ public class SourceOffsetProviderFactory {
 
     static {
         map.put("s3", S3SourceOffsetProvider.class);
+        map.put("cdc_stream", JdbcTvfSourceOffsetProvider.class);
     }
 
     public static SourceOffsetProvider createSourceOffsetProvider(String 
sourceType) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
index 964659e79f4..245429ac357 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
@@ -70,10 +70,10 @@ import java.util.stream.Collectors;
 public class JdbcSourceOffsetProvider implements SourceOffsetProvider {
     public static final String SPLIT_ID = "splitId";
     private static final ObjectMapper objectMapper = new ObjectMapper();
-    private final int snapshotParallelism;
-    private Long jobId;
-    private DataSourceType sourceType;
-    private Map<String, String> sourceProperties = new HashMap<>();
+    protected int snapshotParallelism = 1;
+    protected Long jobId;
+    protected DataSourceType sourceType;
+    protected Map<String, String> sourceProperties = new HashMap<>();
 
     List<SnapshotSplit> remainingSplits = new ArrayList<>();
     List<SnapshotSplit> finishedSplits = new ArrayList<>();
@@ -92,6 +92,16 @@ public class JdbcSourceOffsetProvider implements 
SourceOffsetProvider {
 
     volatile boolean hasMoreData = true;
 
+    /**
+     * No-arg constructor for subclass use.
+     */
+    public JdbcSourceOffsetProvider() {
+        this.chunkHighWatermarkMap = new HashMap<>();
+    }
+
+    /**
+     * Constructor for FROM Source TO Database.
+     */
     public JdbcSourceOffsetProvider(Long jobId, DataSourceType sourceType, 
Map<String, String> sourceProperties) {
         this.jobId = jobId;
         this.sourceType = sourceType;
@@ -155,10 +165,13 @@ public class JdbcSourceOffsetProvider implements 
SourceOffsetProvider {
         return null;
     }
 
+    /**
+     * Should never call this.
+     */
     @Override
-    public InsertIntoTableCommand rewriteTvfParams(InsertIntoTableCommand 
originCommand, Offset nextOffset) {
-        // todo: only for cdc tvf
-        return null;
+    public InsertIntoTableCommand rewriteTvfParams(InsertIntoTableCommand 
originCommand, Offset nextOffset,
+            long taskId) {
+        throw new UnsupportedOperationException("rewriteTvfParams not 
supported for " + getClass().getSimpleName());
     }
 
     @Override
@@ -347,7 +360,7 @@ public class JdbcSourceOffsetProvider implements 
SourceOffsetProvider {
 
     @Override
     public Offset deserializeOffsetProperty(String offset) {
-        // no need
+        // no need cause cdc_stream has offset property
         return null;
     }
 
@@ -415,7 +428,7 @@ public class JdbcSourceOffsetProvider implements 
SourceOffsetProvider {
      * Assign the HW value to the synchronized Split,
      * and remove the Split from remainSplit and place it in finishedSplit.
      */
-    private List<SnapshotSplit> recalculateRemainingSplits(
+    protected List<SnapshotSplit> recalculateRemainingSplits(
             Map<String, Map<String, Map<String, String>>> 
chunkHighWatermarkMap,
             Map<String, List<SnapshotSplit>> snapshotSplits) {
         if (this.finishedSplits == null) {
@@ -541,7 +554,7 @@ public class JdbcSourceOffsetProvider implements 
SourceOffsetProvider {
         }
     }
 
-    private boolean checkNeedSplitChunks(Map<String, String> sourceProperties) 
{
+    protected boolean checkNeedSplitChunks(Map<String, String> 
sourceProperties) {
         String startMode = sourceProperties.get(DataSourceConfigKeys.OFFSET);
         if (startMode == null) {
             return false;
@@ -550,11 +563,18 @@ public class JdbcSourceOffsetProvider implements 
SourceOffsetProvider {
                 || 
DataSourceConfigKeys.OFFSET_SNAPSHOT.equalsIgnoreCase(startMode);
     }
 
-    private boolean isSnapshotOnlyMode() {
+    protected boolean isSnapshotOnlyMode() {
         String offset = sourceProperties.get(DataSourceConfigKeys.OFFSET);
         return DataSourceConfigKeys.OFFSET_SNAPSHOT.equalsIgnoreCase(offset);
     }
 
+    @Override
+    public void onTaskCommitted(long scannedRows, long loadBytes) {
+        if (scannedRows == 0 && loadBytes == 0) {
+            hasMoreData = false;
+        }
+    }
+
     @Override
     public boolean hasReachedEnd() {
         return isSnapshotOnlyMode()
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcTvfSourceOffsetProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcTvfSourceOffsetProvider.java
new file mode 100644
index 00000000000..803deec145a
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcTvfSourceOffsetProvider.java
@@ -0,0 +1,425 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.offset.jdbc;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
+import org.apache.doris.httpv2.entity.ResponseBody;
+import org.apache.doris.job.cdc.DataSourceConfigKeys;
+import org.apache.doris.job.cdc.split.AbstractSourceSplit;
+import org.apache.doris.job.cdc.split.BinlogSplit;
+import org.apache.doris.job.cdc.split.SnapshotSplit;
+import org.apache.doris.job.common.DataSourceType;
+import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
+import org.apache.doris.job.offset.Offset;
+import org.apache.doris.job.util.StreamingJobUtils;
+import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
+import org.apache.doris.nereids.trees.expressions.Properties;
+import org.apache.doris.nereids.trees.plans.Plan;
+import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.proto.InternalService;
+import org.apache.doris.proto.InternalService.PRequestCdcClientResult;
+import org.apache.doris.rpc.BackendServiceProxy;
+import org.apache.doris.system.Backend;
+import org.apache.doris.tablefunction.CdcStreamTableValuedFunction;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TStatusCode;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.gson.Gson;
+import lombok.extern.log4j.Log4j2;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.collections4.MapUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+/**
+ * OffsetProvider for cdc_stream TVF path.
+ *
+ * <p>Differs from JdbcSourceOffsetProvider (non-TVF path) in:
+ * <ul>
+ *   <li>offset commit: FE pulls actual end offset from BE via 
/api/getTaskOffset/{taskId} in
+ *       beforeCommitted, stores in txn attachment (transactionally safe)</li>
+ *   <li>cloud mode snapshot: attachment carries cumulative 
chunkHighWatermarkMap so that
+ *       replayOnCloudMode can recover full state from the single latest 
attachment in MS</li>
+ *   <li>recovery: state is rebuilt from txn replay (chunkHighWatermarkMap 
populated by
+ *       replayOnCommitted/replayOnCloudMode -> updateOffset), not from 
EditLog</li>
+ *   <li>updateOffset: during replay remainingSplits is empty so removeIf 
returns false naturally;
+ *       chunkHighWatermarkMap is always updated unconditionally to support 
recovery</li>
+ *   <li>replayIfNeed: checks currentOffset directly — snapshot triggers 
remainingSplits rebuild
+ *       from meta + chunkHighWatermarkMap; binlog needs no action 
(currentOffset already set)</li>
+ * </ul>
+ */
+@Log4j2
+public class JdbcTvfSourceOffsetProvider extends JdbcSourceOffsetProvider {
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    /**
+     * No-arg constructor required by 
SourceOffsetProviderFactory.createSourceOffsetProvider().
+     */
+    public JdbcTvfSourceOffsetProvider() {
+        super();
+    }
+
+    /**
+     * Initializes provider state and fetches snapshot splits from BE.
+     * splitChunks is called here (rather than in StreamingInsertJob) to keep
+     * all cdc_stream-specific init logic inside the provider.
+     */
+    @Override
+    public void ensureInitialized(Long jobId, Map<String, String> 
originTvfProps) throws JobException {
+        // Always refresh fields that may be updated via ALTER JOB (e.g. 
credentials, parallelism).
+        this.sourceProperties = originTvfProps;
+        this.snapshotParallelism = Integer.parseInt(
+                
originTvfProps.getOrDefault(DataSourceConfigKeys.SNAPSHOT_PARALLELISM,
+                        DataSourceConfigKeys.SNAPSHOT_PARALLELISM_DEFAULT));
+
+        if (this.jobId != null) {
+            return;
+        }
+        // One-time initialization below — safe to skip on FE restart because 
the provider
+        // is reconstructed fresh (getPersistInfo returns null), so jobId is 
null then too.
+        this.jobId = jobId;
+        this.chunkHighWatermarkMap = new HashMap<>();
+        String type = originTvfProps.get(DataSourceConfigKeys.TYPE);
+        Preconditions.checkArgument(type != null, "type is required");
+        this.sourceType = DataSourceType.valueOf(type.toUpperCase());
+        String table = originTvfProps.get(DataSourceConfigKeys.TABLE);
+        Preconditions.checkArgument(table != null, "table is required for 
cdc_stream TVF");
+    }
+
+    /**
+     * Called once on fresh job creation (not on FE restart).
+     * Fetches snapshot splits from BE and persists them to the meta table.
+     */
+    @Override
+    public void initOnCreate() throws JobException {
+        String table = sourceProperties.get(DataSourceConfigKeys.TABLE);
+        splitChunks(Collections.singletonList(table));
+    }
+
+    /**
+     * Rewrites the cdc_stream TVF SQL with current offset meta and taskId,
+     * so the BE knows where to start reading and can report
+     * the end offset back via taskOffsetCache.
+     */
+    @Override
+    public InsertIntoTableCommand rewriteTvfParams(InsertIntoTableCommand 
originCommand,
+            Offset runningOffset, long taskId) {
+        JdbcOffset offset = (JdbcOffset) runningOffset;
+        Map<String, String> props = new HashMap<>();
+        Plan rewritePlan = originCommand.getParsedPlan().get().rewriteUp(plan 
-> {
+            if (plan instanceof UnboundTVFRelation) {
+                UnboundTVFRelation originTvfRel = (UnboundTVFRelation) plan;
+                props.putAll(originTvfRel.getProperties().getMap());
+                props.put(CdcStreamTableValuedFunction.META_KEY, new 
Gson().toJson(offset.generateMeta()));
+                props.put(CdcStreamTableValuedFunction.JOB_ID_KEY, 
String.valueOf(jobId));
+                props.put(CdcStreamTableValuedFunction.TASK_ID_KEY, 
String.valueOf(taskId));
+                return new UnboundTVFRelation(
+                        originTvfRel.getRelationId(), 
originTvfRel.getFunctionName(), new Properties(props));
+            }
+            return plan;
+        });
+        InsertIntoTableCommand cmd = new InsertIntoTableCommand((LogicalPlan) 
rewritePlan,
+                Optional.empty(), Optional.empty(), Optional.empty(), true, 
Optional.empty());
+        cmd.setJobId(originCommand.getJobId());
+        return cmd;
+    }
+
+    /**
+     * Returns the serialized JSON offset to store in txn commit attachment.
+     *
+     * <p>Calls BE /api/getTaskOffset/{taskId} to get the actual end offset 
recorded after
+     * fetchRecordStream completes (stored in 
PipelineCoordinator.taskOffsetCache).
+     *
+     * <p>For cloud + snapshot: returns cumulative list (all previously 
completed chunks +
+     * current task's new splits) so that replayOnCloudMode can recover full 
state from latest attachment.
+     * For non-cloud snapshot / binlog: returns only current task's splits.
+     */
+    @Override
+    public String getCommitOffsetJson(Offset runningOffset, long taskId, 
List<Long> scanBackendIds) {
+        List<Map<String, String>> currentTaskEndOffset = 
fetchTaskEndOffset(taskId, scanBackendIds);
+        if (CollectionUtils.isEmpty(currentTaskEndOffset)) {
+            return "";
+        }
+
+        // Cloud + snapshot: prepend all previously completed chunks so the 
attachment is
+        // self-contained for replayOnCloudMode (MS only keeps the latest 
attachment)
+        if (Config.isCloudMode() && ((JdbcOffset) 
runningOffset).snapshotSplit()) {
+            List<Map<String, String>> cumulative = 
buildCumulativeSnapshotOffset(currentTaskEndOffset);
+            return new Gson().toJson(cumulative);
+        }
+        return new Gson().toJson(currentTaskEndOffset);
+    }
+
+    /**
+     * Queries each scan backend in order until one returns a non-empty offset 
for this taskId.
+     * Only the BE that ran the cdc_stream TVF scan node will have the entry 
in taskOffsetCache.
+     */
+    private List<Map<String, String>> fetchTaskEndOffset(long taskId, 
List<Long> scanBackendIds) {
+        InternalService.PRequestCdcClientRequest request =
+                InternalService.PRequestCdcClientRequest.newBuilder()
+                        .setApi("/api/getTaskOffset/" + taskId).build();
+        for (Long beId : scanBackendIds) {
+            Backend backend = Env.getCurrentSystemInfo().getBackend(beId);
+            if (backend == null) {
+                log.info("Backend {} not found for task {}, skipping", beId, 
taskId);
+                continue;
+            }
+            String rawResponse = null;
+            try {
+                TNetworkAddress address = new 
TNetworkAddress(backend.getHost(), backend.getBrpcPort());
+                Future<PRequestCdcClientResult> future =
+                        
BackendServiceProxy.getInstance().requestCdcClient(address, request);
+                InternalService.PRequestCdcClientResult result = future.get();
+                TStatusCode code = 
TStatusCode.findByValue(result.getStatus().getStatusCode());
+                if (code != TStatusCode.OK) {
+                    log.warn("Failed to get task {} offset from BE {}: {}", 
taskId,
+                            backend.getHost(), 
result.getStatus().getErrorMsgs(0));
+                    continue;
+                }
+                rawResponse = result.getResponse();
+                ResponseBody<List<Map<String, String>>> responseObj = 
OBJECT_MAPPER.readValue(
+                        rawResponse,
+                        new TypeReference<ResponseBody<List<Map<String, 
String>>>>() {});
+                List<Map<String, String>> data = responseObj.getData();
+                if (!CollectionUtils.isEmpty(data)) {
+                    log.info("Fetched task {} offset from BE {}: {}", taskId, 
backend.getHost(), data);
+                    return data;
+                }
+            } catch (Exception ex) {
+                log.warn("Get task offset error for task {} from BE {}, raw 
response: {}",
+                        taskId, backend.getHost(), rawResponse, ex);
+            }
+        }
+        return Collections.emptyList();
+    }
+
+    /**
+     * Merges existing chunkHighWatermarkMap (all previously completed chunks) 
with
+     * current task's new splits, deduplicating by splitId.
+     */
+    private List<Map<String, String>> buildCumulativeSnapshotOffset(
+            List<Map<String, String>> currentTaskSplits) {
+        Set<String> currentSplitIds = currentTaskSplits.stream()
+                .map(m -> m.get(SPLIT_ID)).collect(Collectors.toSet());
+
+        List<Map<String, String>> result = new ArrayList<>();
+        // Add all previously completed chunks (skip any that overlap with 
current task)
+        if (MapUtils.isNotEmpty(chunkHighWatermarkMap)) {
+            for (Map.Entry<String, Map<String, Map<String, String>>> tableEntry
+                    : chunkHighWatermarkMap.entrySet()) {
+                for (Map.Entry<String, Map<String, String>> splitEntry
+                        : tableEntry.getValue().entrySet()) {
+                    if (!currentSplitIds.contains(splitEntry.getKey())) {
+                        Map<String, String> map = new 
HashMap<>(splitEntry.getValue());
+                        map.put(SPLIT_ID, splitEntry.getKey());
+                        result.add(map);
+                    }
+                }
+            }
+        }
+        result.addAll(currentTaskSplits);
+        return result;
+    }
+
+    /**
+     * TVF path updateOffset.
+     *
+     * <p>Snapshot: always writes to chunkHighWatermarkMap (needed for cloud 
cumulative attachment
+     * and FE-restart recovery). In normal flow removeIf finds the split in 
remainingSplits and
+     * adds it to finishedSplits. During txn replay remainingSplits is empty 
so removeIf returns
+     * false naturally — chunkHighWatermarkMap is still updated for 
replayIfNeed to use later.
+     *
+     * <p>Binlog: currentOffset is set above; no extra state needed for TVF 
recovery path.
+     */
+    @Override
+    public void updateOffset(Offset offset) {
+        this.currentOffset = (JdbcOffset) offset;
+        if (currentOffset.snapshotSplit()) {
+            for (AbstractSourceSplit split : currentOffset.getSplits()) {
+                SnapshotSplit ss = (SnapshotSplit) split;
+                boolean removed = remainingSplits.removeIf(v -> {
+                    if (v.getSplitId().equals(ss.getSplitId())) {
+                        ss.setTableId(v.getTableId());
+                        ss.setSplitKey(v.getSplitKey());
+                        ss.setSplitStart(v.getSplitStart());
+                        ss.setSplitEnd(v.getSplitEnd());
+                        return true;
+                    }
+                    return false;
+                });
+                if (removed) {
+                    finishedSplits.add(ss);
+                }
+                chunkHighWatermarkMap.computeIfAbsent(buildTableKey(), k -> 
new HashMap<>())
+                        .put(ss.getSplitId(), ss.getHighWatermark());
+            }
+        }
+        // Binlog: currentOffset is already set; no binlogOffsetPersist needed 
for TVF path.
+    }
+
+    /**
+     * TVF path recovery: offsetProviderPersist is always null (no EditLog 
write).
+     * currentOffset is set by replayOnCommitted/replayOnCloudMode -> 
updateOffset before this runs.
+     *
+     * <ul>
+     *   <li>snapshot: mid-snapshot restart — rebuild remainingSplits from 
meta + chunkHighWatermarkMap</li>
+     *   <li>binlog: currentOffset already correct from updateOffset; nothing 
to do</li>
+     * </ul>
+     */
+    @Override
+    public void replayIfNeed(StreamingInsertJob job) throws JobException {
+        if (currentOffset == null) {
+            // No committed txn yet. If snapshot splits exist in the meta 
table (written by
+            // initOnCreate), restore remainingSplits so getNextOffset() 
returns snapshot splits
+            // instead of a BinlogSplit (which would incorrectly skip the 
snapshot phase).
+            Map<String, List<SnapshotSplit>> snapshotSplits = 
StreamingJobUtils.restoreSplitsToJob(job.getJobId());
+            if (MapUtils.isNotEmpty(snapshotSplits)) {
+                recalculateRemainingSplits(new HashMap<>(), snapshotSplits);
+                log.info("Replaying TVF offset provider for job {}: no 
committed txn,"
+                        + " restored {} remaining splits from meta", 
job.getJobId(), remainingSplits.size());
+            } else {
+                log.info("Replaying TVF offset provider for job {}: no 
committed txn,"
+                        + " no snapshot splits in meta", job.getJobId());
+            }
+            return;
+        }
+        if (currentOffset.snapshotSplit()) {
+            log.info("Replaying TVF offset provider for job {}: restoring 
snapshot state from txn replay",
+                    job.getJobId());
+            Map<String, List<SnapshotSplit>> snapshotSplits = 
StreamingJobUtils.restoreSplitsToJob(job.getJobId());
+            if (MapUtils.isNotEmpty(snapshotSplits)) {
+                // During replay, buildTableKey() may have stored entries 
under "null.null"
+                // because sourceProperties was not yet initialized. Remap all 
committed splits
+                // under the actual table key so recalculateRemainingSplits 
can match them.
+                Map<String, Map<String, Map<String, String>>> effectiveMap =
+                        remapChunkHighWatermarkMap(snapshotSplits);
+                List<SnapshotSplit> lastSnapshotSplits =
+                        recalculateRemainingSplits(effectiveMap, 
snapshotSplits);
+                if (remainingSplits.isEmpty()) {
+                    if (!lastSnapshotSplits.isEmpty()) {
+                        currentOffset = new JdbcOffset(lastSnapshotSplits);
+                    } else if (!isSnapshotOnlyMode()) {
+                        BinlogSplit binlogSplit = new BinlogSplit();
+                        binlogSplit.setFinishedSplits(finishedSplits);
+                        currentOffset = new 
JdbcOffset(Collections.singletonList(binlogSplit));
+                    }
+                }
+            }
+        } else {
+            log.info("Replaying TVF offset provider for job {}: binlog offset 
already set, nothing to do",
+                    job.getJobId());
+        }
+    }
+
+    /**
+     * Builds the chunkHighWatermarkMap outer key as schema.table (if schema 
is present)
+     * or database.table, matching the format used by snapshotSplits keys in
+     * recalculateRemainingSplits.
+     */
+    private String buildTableKey() {
+        String schema = sourceProperties.get(DataSourceConfigKeys.SCHEMA);
+        String qualifier = (schema != null && !schema.isEmpty())
+                ? schema : sourceProperties.get(DataSourceConfigKeys.DATABASE);
+        return qualifier + "." + 
sourceProperties.get(DataSourceConfigKeys.TABLE);
+    }
+
+    /**
+     * Remaps chunkHighWatermarkMap to use the actual table key from 
snapshotSplits.
+     *
+     * <p>During FE-restart replay, buildTableKey() produces "null.null" 
because sourceProperties
+     * is not yet initialized (ensureInitialized has not been called). This 
causes
+     * recalculateRemainingSplits to miss all committed splits (key mismatch) 
and re-process
+     * them, leading to data duplication.
+     *
+     * <p>The fix: flatten all inner splitId-&gt;highWatermark entries across 
every outer key and
+     * re-key them under the actual table name read from the meta table 
(snapshotSplits key).
+     * TVF always has exactly one table, so snapshotSplits has exactly one 
entry.
+     *
+     * <tableName -> splitId -> chunk of highWatermark>
+     */
+    private Map<String, Map<String, Map<String, String>>> 
remapChunkHighWatermarkMap(
+            Map<String, List<SnapshotSplit>> snapshotSplits) {
+        if (MapUtils.isEmpty(chunkHighWatermarkMap)) {
+            return chunkHighWatermarkMap;
+        }
+        // Flatten all committed splitId -> highWatermark entries, ignoring 
the (possibly wrong) outer key
+        Map<String, Map<String, String>> flatCommitted = new HashMap<>();
+        for (Map<String, Map<String, String>> inner : 
chunkHighWatermarkMap.values()) {
+            if (inner != null) {
+                flatCommitted.putAll(inner);
+            }
+        }
+        if (flatCommitted.isEmpty()) {
+            return chunkHighWatermarkMap;
+        }
+        // Re-key under the actual table name from snapshotSplits (TVF has 
exactly one table).
+        // getTableName() in recalculateRemainingSplits splits by "." and 
takes the last segment,
+        // so using the plain table name as key (no dots) ensures a direct 
match.
+        String actualTableKey = snapshotSplits.keySet().iterator().next();
+        Map<String, Map<String, Map<String, String>>> remapped = new 
HashMap<>();
+        remapped.put(actualTableKey, flatCommitted);
+        return remapped;
+    }
+
+    /**
+     * TVF path does not persist to EditLog; state is recovered via txn replay.
+     * This override is defensive — the persistOffsetProviderIfNeed() call path
+     * only runs in the non-TVF commitOffset flow and won't reach here.
+     */
+    @Override
+    public String getPersistInfo() {
+        return null;
+    }
+
+    @Override
+    public void applyEndOffsetToTask(Offset runningOffset, Offset endOffset) {
+        if (!(runningOffset instanceof JdbcOffset) || !(endOffset instanceof 
JdbcOffset)) {
+            return;
+        }
+        JdbcOffset running = (JdbcOffset) runningOffset;
+        JdbcOffset end = (JdbcOffset) endOffset;
+        if (running.snapshotSplit()) {
+            for (int i = 0; i < running.getSplits().size() && i < 
end.getSplits().size(); i++) {
+                SnapshotSplit rSplit = (SnapshotSplit) 
running.getSplits().get(i);
+                SnapshotSplit eSplit = (SnapshotSplit) end.getSplits().get(i);
+                rSplit.setHighWatermark(eSplit.getHighWatermark());
+            }
+        } else {
+            BinlogSplit rSplit = (BinlogSplit) running.getSplits().get(0);
+            BinlogSplit eSplit = (BinlogSplit) end.getSplits().get(0);
+            // deserializeOffset stores binlog position in startingOffset
+            rSplit.setEndingOffset(eSplit.getStartingOffset());
+        }
+    }
+
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
index 23996fb8f38..886e03f21c3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
@@ -131,7 +131,8 @@ public class S3SourceOffsetProvider implements 
SourceOffsetProvider {
     }
 
     @Override
-    public InsertIntoTableCommand rewriteTvfParams(InsertIntoTableCommand 
originCommand, Offset runningOffset) {
+    public InsertIntoTableCommand rewriteTvfParams(InsertIntoTableCommand 
originCommand,
+            Offset runningOffset, long taskId) {
         S3Offset offset = (S3Offset) runningOffset;
         Map<String, String> props = new HashMap<>();
         // rewrite plan
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
index 584ee0848db..fc63710f93d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
@@ -246,6 +246,22 @@ public class AlterJobCommand extends AlterCommand 
implements ForwardWithSync, Ne
                         inputTvf.getProperties().getMap().get("uri")),
                         "The uri property cannot be modified in ALTER JOB");
                 break;
+            case "cdc_stream":
+                // type, jdbc_url, database, schema, and table identify the 
source and cannot be changed.
+                // user, password, driver_url, driver_class, etc. are 
modifiable (credential rotation).
+                for (String unmodifiable : new String[] {
+                        DataSourceConfigKeys.TYPE,
+                        DataSourceConfigKeys.JDBC_URL,
+                        DataSourceConfigKeys.DATABASE,
+                        DataSourceConfigKeys.SCHEMA,
+                        DataSourceConfigKeys.TABLE}) {
+                    Preconditions.checkArgument(
+                            Objects.equals(
+                                    
originTvf.getProperties().getMap().get(unmodifiable),
+                                    
inputTvf.getProperties().getMap().get(unmodifiable)),
+                            "The '%s' property cannot be modified in ALTER JOB 
for cdc_stream", unmodifiable);
+                }
+                break;
             default:
                 throw new IllegalArgumentException("Unsupported tvf type:" + 
inputTvf.getFunctionName());
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
index 38e4059711c..9ecdb14e90b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
@@ -574,6 +574,10 @@ public abstract class ScanNode extends PlanNode implements 
SplitGenerator {
         return scanBackendIds.size();
     }
 
+    public Set<Long> getScanBackendIds() {
+        return scanBackendIds;
+    }
+
     public int getScanRangeNum() {
         return Integer.MAX_VALUE;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 397ddfb982b..dac24f2f798 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -3516,6 +3516,18 @@ public class Coordinator implements CoordInterface {
         return backendAddresses;
     }
 
+    /**
+     * Returns the IDs of backends that have scan ranges assigned, collected 
from each ScanNode's
+     * scanBackendIds (populated during plan phase).
+     */
+    public List<Long> getScanBackendIds() {
+        Set<Long> result = Sets.newHashSet();
+        for (ScanNode scanNode : scanNodes) {
+            result.addAll(scanNode.getScanBackendIds());
+        }
+        return Lists.newArrayList(result);
+    }
+
     public List<PlanFragment> getFragments() {
         return fragments;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java
index 621d2fbe6b6..ce91f8a7fe7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java
@@ -29,7 +29,10 @@ import org.apache.doris.job.util.StreamingJobUtils;
 import org.apache.doris.thrift.TBrokerFileStatus;
 import org.apache.doris.thrift.TFileType;
 
+import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.StringUtils;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -41,6 +44,14 @@ import java.util.UUID;
 public class CdcStreamTableValuedFunction extends 
ExternalFileTableValuedFunction {
     private static final ObjectMapper objectMapper = new ObjectMapper();
     private static final String URI = 
"http://127.0.0.1:CDC_CLIENT_PORT/api/fetchRecordStream";;
+    private static final String ENABLE_CDC_CLIENT_KEY = "enable_cdc_client";
+    private static final String HTTP_ENABLE_RANGE_REQUEST_KEY = 
"http.enable.range.request";
+    private static final String HTTP_ENABLE_CHUNK_RESPONSE_KEY = 
"http.enable.chunk.response";
+    private static final String HTTP_METHOD_KEY = "http.method";
+    private static final String HTTP_PAYLOAD_KEY = "http.payload";
+    public static final String JOB_ID_KEY = "job.id";
+    public static final String TASK_ID_KEY = "task.id";
+    public static final String META_KEY = "meta";
 
     public CdcStreamTableValuedFunction(Map<String, String> properties) throws 
AnalysisException {
         validate(properties);
@@ -51,24 +62,34 @@ public class CdcStreamTableValuedFunction extends 
ExternalFileTableValuedFunctio
         Map<String, String> copyProps = new HashMap<>(properties);
         copyProps.put("format", "json");
         super.parseCommonProperties(copyProps);
-        this.processedParams.put("enable_cdc_client", "true");
-        this.processedParams.put("uri", URI);
-        this.processedParams.put("http.enable.range.request", "false");
-        this.processedParams.put("http.enable.chunk.response", "true");
-        this.processedParams.put("http.method", "POST");
+        this.processedParams.put(ENABLE_CDC_CLIENT_KEY, "true");
+        this.processedParams.put(URI_KEY, URI);
+        this.processedParams.put(HTTP_ENABLE_RANGE_REQUEST_KEY, "false");
+        this.processedParams.put(HTTP_ENABLE_CHUNK_RESPONSE_KEY, "true");
+        this.processedParams.put(HTTP_METHOD_KEY, "POST");
 
         String payload = generateParams(properties);
-        this.processedParams.put("http.payload", payload);
+        this.processedParams.put(HTTP_PAYLOAD_KEY, payload);
         this.backendConnectProperties.putAll(processedParams);
         generateFileStatus();
     }
 
     private String generateParams(Map<String, String> properties) throws 
AnalysisException {
         FetchRecordRequest recordRequest = new FetchRecordRequest();
-        recordRequest.setJobId(UUID.randomUUID().toString().replace("-", ""));
+        String defaultJobId = UUID.randomUUID().toString().replace("-", "");
+        recordRequest.setJobId(properties.getOrDefault(JOB_ID_KEY, 
defaultJobId));
         recordRequest.setDataSource(properties.get(DataSourceConfigKeys.TYPE));
         recordRequest.setConfig(properties);
         try {
+            // for tvf with job
+            if (properties.containsKey(TASK_ID_KEY)) {
+                recordRequest.setTaskId(properties.remove(TASK_ID_KEY));
+                String meta = properties.remove(META_KEY);
+                Preconditions.checkArgument(StringUtils.isNotEmpty(meta), 
"meta is required when task.id is provided");
+                Map<String, Object> metaMap = objectMapper.readValue(meta, new 
TypeReference<Map<String, Object>>() {});
+                recordRequest.setMeta(metaMap);
+            }
+
             return objectMapper.writeValueAsString(recordRequest);
         } catch (IOException e) {
             LOG.warn("Failed to serialize fetch record request", e);
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
index 0b677208404..3d6f1af85d1 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
@@ -135,7 +135,7 @@ public class ClientController {
     }
 
     @RequestMapping(path = "/api/getTaskOffset/{taskId}", method = 
RequestMethod.POST)
-    public Object getTaskIdOffset(@PathVariable String taskId) {
+    public Object getTaskIdOffset(@PathVariable("taskId") String taskId) {
         return 
RestResponse.success(pipelineCoordinator.getOffsetWithTaskId(taskId));
     }
 }
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
index 850c3038b91..46fabe2d418 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
@@ -74,8 +74,9 @@ public class PipelineCoordinator {
     private static final String SPLIT_ID = "splitId";
     // jobId
     private final Map<String, DorisBatchStreamLoad> batchStreamLoadMap = new 
ConcurrentHashMap<>();
-    // taskId, offset
-    private final Map<String, Map<String, String>> taskOffsetCache = new 
ConcurrentHashMap<>();
+    // taskId -> list of split offsets (accumulates all splits processed in 
one task)
+    private final Map<String, List<Map<String, String>>> taskOffsetCache =
+            new ConcurrentHashMap<>();
     // taskId -> writeFailReason
     private final Map<String, String> taskErrorMaps = new 
ConcurrentHashMap<>();
     private final ThreadPoolExecutor executor;
@@ -107,14 +108,16 @@ public class PipelineCoordinator {
         SourceReader sourceReader;
         SplitReadResult readResult;
         try {
+            LOG.info(
+                    "Fetch record request with meta {}, jobId={}, taskId={}",
+                    fetchReq.getMeta(),
+                    fetchReq.getJobId(),
+                    fetchReq.getTaskId());
+            // TVF doesn't have meta value; meta need to be extracted from the 
offset.
             if (fetchReq.getTaskId() == null && fetchReq.getMeta() == null) {
-                LOG.info(
-                        "Generate initial meta for fetch record request, 
jobId={}, taskId={}",
-                        fetchReq.getJobId(),
-                        fetchReq.getTaskId());
-                // means the request did not originate from the job, only tvf
                 Map<String, Object> meta = generateMeta(fetchReq.getConfig());
                 fetchReq.setMeta(meta);
+                LOG.info("Generated meta for job {}: {}", fetchReq.getJobId(), 
meta);
             }
 
             sourceReader = Env.getCurrentEnv().getReader(fetchReq);
@@ -144,19 +147,33 @@ public class PipelineCoordinator {
             OutputStream rawOutputStream)
             throws Exception {
         SourceSplit split = readResult.getSplit();
-        Map<String, String> lastMeta = null;
+        boolean isSnapshotSplit = sourceReader.isSnapshotSplit(split);
         int rowCount = 0;
+        int heartbeatCount = 0;
         BufferedOutputStream bos = new BufferedOutputStream(rawOutputStream);
+        boolean hasReceivedData = false;
+        boolean lastMessageIsHeartbeat = false;
+        long startTime = System.currentTimeMillis();
         try {
-            // Poll records using the existing mechanism
             boolean shouldStop = false;
-            long startTime = System.currentTimeMillis();
+            LOG.info(
+                    "Start polling records for jobId={} taskId={}, 
isSnapshotSplit={}",
+                    fetchRecord.getJobId(),
+                    fetchRecord.getTaskId(),
+                    isSnapshotSplit);
             while (!shouldStop) {
                 Iterator<SourceRecord> recordIterator = 
sourceReader.pollRecords();
                 if (!recordIterator.hasNext()) {
                     Thread.sleep(100);
                     long elapsedTime = System.currentTimeMillis() - startTime;
-                    if (elapsedTime > Constants.POLL_SPLIT_RECORDS_TIMEOUTS) {
+                    boolean timeoutReached = elapsedTime > 
Constants.POLL_SPLIT_RECORDS_TIMEOUTS;
+                    if (shouldStop(
+                            isSnapshotSplit,
+                            hasReceivedData,
+                            lastMessageIsHeartbeat,
+                            elapsedTime,
+                            Constants.POLL_SPLIT_RECORDS_TIMEOUTS,
+                            timeoutReached)) {
                         break;
                     }
                     continue;
@@ -164,8 +181,19 @@ public class PipelineCoordinator {
                 while (recordIterator.hasNext()) {
                     SourceRecord element = recordIterator.next();
                     if (isHeartbeatEvent(element)) {
-                        shouldStop = true;
-                        break;
+                        heartbeatCount++;
+                        if (!isSnapshotSplit) {
+                            lastMessageIsHeartbeat = true;
+                        }
+                        long elapsedTime = System.currentTimeMillis() - 
startTime;
+                        boolean timeoutReached =
+                                elapsedTime > 
Constants.POLL_SPLIT_RECORDS_TIMEOUTS;
+                        if (!isSnapshotSplit && timeoutReached) {
+                            shouldStop = true;
+                            break;
+                        }
+                        // Heartbeat before timeout: skip and keep reading.
+                        continue;
                     }
                     DeserializeResult result =
                             sourceReader.deserialize(fetchRecord.getConfig(), 
element);
@@ -175,9 +203,18 @@ public class PipelineCoordinator {
                             bos.write(LINE_DELIMITER);
                         }
                         rowCount += result.getRecords().size();
+                        hasReceivedData = true;
+                        lastMessageIsHeartbeat = false;
                     }
                 }
             }
+            LOG.info(
+                    "Fetched {} records and {} heartbeats in {} ms for 
jobId={} taskId={}",
+                    rowCount,
+                    heartbeatCount,
+                    System.currentTimeMillis() - startTime,
+                    fetchRecord.getJobId(),
+                    fetchRecord.getTaskId());
             // force flush buffer
             bos.flush();
         } finally {
@@ -186,32 +223,10 @@ public class PipelineCoordinator {
             sourceReader.finishSplitRecords();
         }
 
-        LOG.info(
-                "Fetch records completed, jobId={}, taskId={}, splitId={}, 
rowCount={}",
-                fetchRecord.getJobId(),
-                fetchRecord.getTaskId(),
-                split.splitId(),
-                rowCount);
-
-        if (readResult.getSplitState() != null) {
-            if (sourceReader.isSnapshotSplit(split)) {
-                lastMeta = 
sourceReader.extractSnapshotStateOffset(readResult.getSplitState());
-                lastMeta.put(SPLIT_ID, split.splitId());
-            } else if (sourceReader.isBinlogSplit(split)) {
-                lastMeta = 
sourceReader.extractBinlogStateOffset(readResult.getSplitState());
-                lastMeta.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
-            } else {
-                throw new RuntimeException(
-                        "unknown split type: " + 
split.getClass().getSimpleName());
-            }
-        } else {
-            throw new RuntimeException("split state is null");
-        }
-
+        List<Map<String, String>> offsetMeta = extractOffsetMeta(sourceReader, 
readResult);
         if (StringUtils.isNotEmpty(fetchRecord.getTaskId())) {
-            taskOffsetCache.put(fetchRecord.getTaskId(), lastMeta);
+            taskOffsetCache.put(fetchRecord.getTaskId(), offsetMeta);
         }
-
         // Convention: standalone TVF uses a UUID jobId; job-driven TVF will 
use a numeric Long
         // jobId (set via rewriteTvfParams). When the job-driven path is 
implemented,
         // rewriteTvfParams must inject the job's Long jobId into the TVF 
properties
@@ -751,8 +766,8 @@ public class PipelineCoordinator {
         return commitOffsets;
     }
 
-    public Map<String, String> getOffsetWithTaskId(String taskId) {
-        Map<String, String> taskOffset = taskOffsetCache.remove(taskId);
-        return taskOffset == null ? new HashMap<>() : taskOffset;
+    public List<Map<String, String>> getOffsetWithTaskId(String taskId) {
+        List<Map<String, String>> taskOffset = taskOffsetCache.remove(taskId);
+        return taskOffset == null ? new ArrayList<>() : taskOffset;
     }
 }
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
index a4f3a9e2547..9d77f3912eb 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
@@ -62,6 +62,7 @@ import 
org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils;
 import org.apache.flink.cdc.connectors.mysql.source.utils.TableDiscoveryUtils;
 import org.apache.flink.cdc.connectors.mysql.table.StartupMode;
 import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
+import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.kafka.connect.source.SourceRecord;
 
@@ -848,6 +849,22 @@ public class MySqlSourceReader extends 
AbstractCdcSourceReader {
                     
Integer.parseInt(cdcConfig.get(DataSourceConfigKeys.SNAPSHOT_SPLIT_SIZE)));
         }
 
+        // todo: Currently, only one split key is supported; future will 
require multiple split
+        // keys.
+        if (cdcConfig.containsKey(DataSourceConfigKeys.SNAPSHOT_SPLIT_KEY)) {
+            String database = cdcConfig.get(DataSourceConfigKeys.DATABASE);
+            String table = cdcConfig.get(DataSourceConfigKeys.TABLE);
+            Preconditions.checkArgument(
+                    database != null && !database.isEmpty() && table != null 
&& !table.isEmpty(),
+                    "When '%s' is set, both '%s' and '%s' must be configured 
(include_tables is not supported).",
+                    DataSourceConfigKeys.SNAPSHOT_SPLIT_KEY,
+                    DataSourceConfigKeys.DATABASE,
+                    DataSourceConfigKeys.TABLE);
+            ObjectPath objectPath = new ObjectPath(database, table);
+            configFactory.chunkKeyColumn(
+                    objectPath, 
cdcConfig.get(DataSourceConfigKeys.SNAPSHOT_SPLIT_KEY));
+        }
+
         return configFactory.createConfig(0);
     }
 
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
index 492e0650d4b..b6d28510613 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
@@ -221,6 +221,10 @@ public class PostgresSourceReader extends 
JdbcIncrementalSourceReader {
                     
Integer.parseInt(cdcConfig.get(DataSourceConfigKeys.SNAPSHOT_SPLIT_SIZE)));
         }
 
+        if (cdcConfig.containsKey(DataSourceConfigKeys.SNAPSHOT_SPLIT_KEY)) {
+            
configFactory.chunkKeyColumn(cdcConfig.get(DataSourceConfigKeys.SNAPSHOT_SPLIT_KEY));
+        }
+
         Properties dbzProps = ConfigUtil.getDefaultDebeziumProps();
         dbzProps.put("interval.handling.mode", "string");
         configFactory.debeziumProperties(dbzProps);
diff --git a/fs_brokers/cdc_client/src/main/resources/application.properties 
b/fs_brokers/cdc_client/src/main/resources/application.properties
index a7ea3996a6f..d22c98d434b 100644
--- a/fs_brokers/cdc_client/src/main/resources/application.properties
+++ b/fs_brokers/cdc_client/src/main/resources/application.properties
@@ -19,4 +19,5 @@ spring.web.resources.add-mappings=false
 server.port=9096
 backend.http.port=8040
 # see doris-meta/image/VERSION
-cluster.token=cluster-token
\ No newline at end of file
+cluster.token=cluster-token
+spring.mvc.async.request-timeout=300000
\ No newline at end of file
diff --git 
a/regression-test/data/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_mysql.out
 
b/regression-test/data/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_mysql.out
new file mode 100644
index 00000000000..c9f1ceb1307
--- /dev/null
+++ 
b/regression-test/data/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_mysql.out
@@ -0,0 +1,11 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !snapshot_data --
+A1     1
+B1     2
+
+-- !final_data --
+A1     1
+B1     2
+C1     3
+D1     4
+
diff --git 
a/regression-test/data/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres.out
 
b/regression-test/data/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres.out
new file mode 100644
index 00000000000..c9f1ceb1307
--- /dev/null
+++ 
b/regression-test/data/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres.out
@@ -0,0 +1,11 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !snapshot_data --
+A1     1
+B1     2
+
+-- !final_data --
+A1     1
+B1     2
+C1     3
+D1     4
+
diff --git 
a/regression-test/data/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_latest_alter_cred.out
 
b/regression-test/data/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_latest_alter_cred.out
new file mode 100644
index 00000000000..3f6c816fa20
--- /dev/null
+++ 
b/regression-test/data/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_latest_alter_cred.out
@@ -0,0 +1,11 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !offset_latest_data --
+C1     3
+D1     4
+
+-- !final_data --
+C1     3
+D1     4
+E1     5
+F1     6
+
diff --git 
a/regression-test/data/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_pause_resume.out
 
b/regression-test/data/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_pause_resume.out
new file mode 100644
index 00000000000..33451a3763a
--- /dev/null
+++ 
b/regression-test/data/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_pause_resume.out
@@ -0,0 +1,13 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !snapshot_data --
+A1     1
+B1     2
+
+-- !final_data --
+A1     1
+B1     2
+C1     30
+D1     4
+E1     5
+F1     6
+
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_mysql.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_mysql.groovy
new file mode 100644
index 00000000000..fdc50c76118
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_mysql.groovy
@@ -0,0 +1,128 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+/**
+ * Test streaming INSERT job using cdc_stream TVF for MySQL.
+ *
+ * Scenario:
+ *   1. Snapshot phase (offset=initial): pre-existing rows (A1, B1) are synced.
+ *   2. Binlog phase: INSERT (C1, D1)  are applied.
+ */
+suite("test_streaming_job_cdc_stream_mysql", 
"p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
+    def jobName = "test_streaming_job_cdc_stream_mysql_name"
+    def currentDb = (sql "select database()")[0][0]
+    def dorisTable = "test_streaming_job_cdc_stream_mysql_tbl"
+    def mysqlDb = "test_cdc_db"
+    def mysqlTable = "test_streaming_job_cdc_stream_mysql_src"
+
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    sql """drop table if exists ${currentDb}.${dorisTable} force"""
+
+    sql """
+        CREATE TABLE IF NOT EXISTS ${currentDb}.${dorisTable} (
+            `name` varchar(200) NULL,
+            `age`  int NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`name`)
+        DISTRIBUTED BY HASH(`name`) BUCKETS AUTO
+        PROPERTIES ("replication_allocation" = "tag.location.default: 1")
+    """
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String mysql_port = context.config.otherConfigs.get("mysql_57_port")
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint = getS3Endpoint()
+        String bucket = getS3BucketName()
+        String driver_url = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar";
+
+        // prepare source table with pre-existing snapshot data
+        connect("root", "123456", 
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+            sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}"""
+            sql """DROP TABLE IF EXISTS ${mysqlDb}.${mysqlTable}"""
+            sql """CREATE TABLE ${mysqlDb}.${mysqlTable} (
+                      `name` varchar(200) NOT NULL,
+                      `age`  int DEFAULT NULL
+                  ) ENGINE=InnoDB"""
+            sql """INSERT INTO ${mysqlDb}.${mysqlTable} (name, age) VALUES 
('A1', 1)"""
+            sql """INSERT INTO ${mysqlDb}.${mysqlTable} (name, age) VALUES 
('B1', 2)"""
+        }
+
+        // create streaming job via cdc_stream TVF (offset=initial → snapshot 
then binlog)
+        sql """
+            CREATE JOB ${jobName}
+            ON STREAMING DO INSERT INTO ${currentDb}.${dorisTable} (name, age)
+            SELECT name, age FROM cdc_stream(
+                "type"         = "mysql",
+                "jdbc_url"     = "jdbc:mysql://${externalEnvIp}:${mysql_port}",
+                "driver_url"   = "${driver_url}",
+                "driver_class" = "com.mysql.cj.jdbc.Driver",
+                "user"         = "root",
+                "password"     = "123456",
+                "database"          = "${mysqlDb}",
+                "table"             = "${mysqlTable}",
+                "offset"            = "initial",
+                "snapshot_split_key" = "name"
+            )
+        """
+
+        // wait for at least one snapshot task to succeed
+        try {
+            Awaitility.await().atMost(300, SECONDS).pollInterval(2, 
SECONDS).until({
+                def cnt = sql """select SucceedTaskCount from 
jobs("type"="insert") where Name='${jobName}' and ExecuteType='STREAMING'"""
+                log.info("SucceedTaskCount: " + cnt)
+                cnt.size() == 1 && (cnt.get(0).get(0) as int) >= 1
+            })
+        } catch (Exception ex) {
+            log.info("job: " + (sql """select * from jobs("type"="insert") 
where Name='${jobName}'"""))
+            log.info("tasks: " + (sql """select * from tasks("type"="insert") 
where JobName='${jobName}'"""))
+            throw ex
+        }
+
+        // verify snapshot data
+        qt_snapshot_data """ SELECT * FROM ${currentDb}.${dorisTable} ORDER BY 
name """
+
+        // insert incremental rows in MySQL
+        connect("root", "123456", 
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+            sql """INSERT INTO ${mysqlDb}.${mysqlTable} (name, age) VALUES 
('C1', 3)"""
+            sql """INSERT INTO ${mysqlDb}.${mysqlTable} (name, age) VALUES 
('D1', 4)"""
+        }
+
+        // wait for binlog tasks to pick up the new rows
+        try {
+            Awaitility.await().atMost(120, SECONDS).pollInterval(2, 
SECONDS).until({
+                def rows = sql """SELECT count(1) FROM 
${currentDb}.${dorisTable} WHERE name IN ('C1', 'D1')"""
+                log.info("incremental rows: " + rows)
+                (rows.get(0).get(0) as int) == 2
+            })
+        } catch (Exception ex) {
+            log.info("job: " + (sql """select * from jobs("type"="insert") 
where Name='${jobName}'"""))
+            log.info("tasks: " + (sql """select * from tasks("type"="insert") 
where JobName='${jobName}'"""))
+            throw ex
+        }
+
+        qt_final_data """ SELECT * FROM ${currentDb}.${dorisTable} ORDER BY 
name """
+
+        sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+        sql """drop table if exists ${currentDb}.${dorisTable} force"""
+    }
+}
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres.groovy
new file mode 100644
index 00000000000..3fa853e2118
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres.groovy
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+/**
+ * Test streaming INSERT job using cdc_stream TVF for PostgreSQL.
+ *
+ * Scenario:
+ *   1. Snapshot phase (offset=initial): pre-existing rows (A1, B1) are synced.
+ *   2. Binlog phase: INSERT (C1, D1) are applied.
+ */
+suite("test_streaming_job_cdc_stream_postgres", 
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+    def jobName = "test_streaming_job_cdc_stream_postgres_name"
+    def currentDb = (sql "select database()")[0][0]
+    def dorisTable = "test_streaming_job_cdc_stream_postgres_tbl"
+    def pgDB = "postgres"
+    def pgSchema = "cdc_test"
+    def pgUser = "postgres"
+    def pgPassword = "123456"
+    def pgTable = "test_streaming_job_cdc_stream_postgres_src"
+
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    sql """drop table if exists ${currentDb}.${dorisTable} force"""
+
+    sql """
+        CREATE TABLE IF NOT EXISTS ${currentDb}.${dorisTable} (
+            `name` varchar(200) NULL,
+            `age`  int NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`name`)
+        DISTRIBUTED BY HASH(`name`) BUCKETS AUTO
+        PROPERTIES ("replication_allocation" = "tag.location.default: 1")
+    """
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String pg_port = context.config.otherConfigs.get("pg_14_port")
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint = getS3Endpoint()
+        String bucket = getS3BucketName()
+        String driver_url = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar";
+
+        // prepare source table with pre-existing snapshot data
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${pgTable}"""
+            sql """CREATE TABLE ${pgDB}.${pgSchema}.${pgTable} (
+                      "name" varchar(200) PRIMARY KEY,
+                      "age"  int2
+                  )"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age) 
VALUES ('A1', 1)"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age) 
VALUES ('B1', 2)"""
+        }
+
+        // create streaming job via cdc_stream TVF (offset=initial → snapshot 
then binlog)
+        sql """
+            CREATE JOB ${jobName}
+            ON STREAMING DO INSERT INTO ${currentDb}.${dorisTable} (name, age)
+            SELECT name, age FROM cdc_stream(
+                "type"         = "postgres",
+                "jdbc_url"     = 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+                "driver_url"   = "${driver_url}",
+                "driver_class" = "org.postgresql.Driver",
+                "user"         = "${pgUser}",
+                "password"     = "${pgPassword}",
+                "database"     = "${pgDB}",
+                "schema"             = "${pgSchema}",
+                "table"              = "${pgTable}",
+                "offset"             = "initial",
+                "snapshot_split_size"             = "1"
+            )
+        """
+
+        // wait for at least one snapshot task to succeed
+        try {
+            Awaitility.await().atMost(300, SECONDS).pollInterval(2, 
SECONDS).until({
+                def cnt = sql """select SucceedTaskCount from 
jobs("type"="insert") where Name='${jobName}' and ExecuteType='STREAMING'"""
+                log.info("SucceedTaskCount: " + cnt)
+                cnt.size() == 1 && (cnt.get(0).get(0) as int) >= 2
+            })
+        } catch (Exception ex) {
+            log.info("job: " + (sql """select * from jobs("type"="insert") 
where Name='${jobName}'"""))
+            log.info("tasks: " + (sql """select * from tasks("type"="insert") 
where JobName='${jobName}'"""))
+            throw ex
+        }
+
+        // verify snapshot data
+        qt_snapshot_data """ SELECT * FROM ${currentDb}.${dorisTable} ORDER BY 
name """
+
+        // insert incremental rows in PostgreSQL
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age) 
VALUES ('C1', 3)"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age) 
VALUES ('D1', 4)"""
+        }
+
+        // wait for binlog tasks to pick up the new rows
+        try {
+            Awaitility.await().atMost(120, SECONDS).pollInterval(2, 
SECONDS).until({
+                def rows = sql """SELECT count(1) FROM 
${currentDb}.${dorisTable} WHERE name IN ('C1', 'D1')"""
+                log.info("incremental rows: " + rows)
+                (rows.get(0).get(0) as int) == 2
+            })
+        } catch (Exception ex) {
+            log.info("job: " + (sql """select * from jobs("type"="insert") 
where Name='${jobName}'"""))
+            log.info("tasks: " + (sql """select * from tasks("type"="insert") 
where JobName='${jobName}'"""))
+            throw ex
+        }
+
+        qt_final_data """ SELECT * FROM ${currentDb}.${dorisTable} ORDER BY 
name """
+
+        sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+        sql """drop table if exists ${currentDb}.${dorisTable} force"""
+    }
+}
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_latest_alter_cred.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_latest_alter_cred.groovy
new file mode 100644
index 00000000000..01d3defafaf
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_latest_alter_cred.groovy
@@ -0,0 +1,249 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+/**
+ * Test cdc_stream TVF streaming job with offset=latest and credential update 
via ALTER JOB.
+ *
+ * Scenario A — offset=latest:
+ *   1. Pre-existing rows (A1, B1) are present in the PG source table before 
the job is created.
+ *   2. Job is created with offset=latest: snapshot is skipped, only changes 
after job creation are synced.
+ *   3. INSERT (C1, D1) after job creation → should appear in Doris.
+ *   4. Verify A1, B1 are NOT in Doris (skipped by offset=latest).
+ *
+ * Scenario B — ALTER with wrong credentials:
+ *   1. Pause the job.
+ *   2. ALTER JOB with wrong password (same type/database/table, so the 
unmodifiable-property check passes).
+ *   3. Resume the job → fetchMeta fails → job auto-pauses with an error 
message.
+ *   4. Verify the error message reflects the credential failure.
+ *
+ * Scenario C — ALTER back to correct credentials and recover:
+ *   1. Alter back to correct credentials while still PAUSED.
+ *   2. Resume the job → job returns to RUNNING.
+ *   3. INSERT (E1, F1) after recovery → should appear in Doris (no data loss 
during pause).
+ *   4. Verify final data: only C1, D1, E1, F1 are present (A1, B1 still 
absent).
+ */
+suite("test_streaming_job_cdc_stream_postgres_latest_alter_cred",
+        "p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+
+    def jobName   = "test_streaming_job_cdc_stream_pg_latest_alter_cred"
+    def currentDb = (sql "select database()")[0][0]
+    def dorisTable = "test_streaming_job_cdc_stream_pg_latest_alter_cred_tbl"
+    def pgDB       = "postgres"
+    def pgSchema   = "cdc_test"
+    def pgUser     = "postgres"
+    def pgPassword = "123456"
+    def pgTable    = "test_streaming_job_cdc_stream_pg_latest_alter_cred_src"
+
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    sql """drop table if exists ${currentDb}.${dorisTable} force"""
+
+    sql """
+        CREATE TABLE IF NOT EXISTS ${currentDb}.${dorisTable} (
+            `name` varchar(200) NOT NULL,
+            `age`  int NULL
+        ) ENGINE=OLAP
+        UNIQUE KEY(`name`)
+        DISTRIBUTED BY HASH(`name`) BUCKETS AUTO
+        PROPERTIES ("replication_allocation" = "tag.location.default: 1")
+    """
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String pg_port = context.config.otherConfigs.get("pg_14_port")
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint = getS3Endpoint()
+        String bucket = getS3BucketName()
+        String driver_url = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar";
+        String jdbcUrl = 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}"
+
+        // ── Scenario A setup: pre-existing rows exist before job creation 
─────────────
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${pgTable}"""
+            sql """CREATE TABLE ${pgDB}.${pgSchema}.${pgTable} (
+                      "name" varchar(200) PRIMARY KEY,
+                      "age"  int2
+                  )"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age) 
VALUES ('A1', 1)"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age) 
VALUES ('B1', 2)"""
+        }
+
+        // ── Scenario A: create job with offset=latest (skip snapshot) 
─────────────────
+        sql """
+            CREATE JOB ${jobName}
+            ON STREAMING DO INSERT INTO ${currentDb}.${dorisTable} (name, age)
+            SELECT name, age FROM cdc_stream(
+                "type"         = "postgres",
+                "jdbc_url"     = "${jdbcUrl}",
+                "driver_url"   = "${driver_url}",
+                "driver_class" = "org.postgresql.Driver",
+                "user"         = "${pgUser}",
+                "password"     = "${pgPassword}",
+                "database"     = "${pgDB}",
+                "schema"       = "${pgSchema}",
+                "table"        = "${pgTable}",
+                "offset"       = "latest"
+            )
+        """
+
+        // wait for job to reach RUNNING state (at least one task scheduled)
+        try {
+            Awaitility.await().atMost(60, SECONDS).pollInterval(1, 
SECONDS).until({
+                def status = sql """select status from jobs("type"="insert") 
where Name='${jobName}'"""
+                status.size() == 1 && status.get(0).get(0) == "RUNNING"
+            })
+        } catch (Exception ex) {
+            log.info("job: " + (sql """select * from jobs("type"="insert") 
where Name='${jobName}'"""))
+            throw ex
+        }
+
+        // insert rows AFTER job creation — only these should appear in Doris
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age) 
VALUES ('C1', 3)"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age) 
VALUES ('D1', 4)"""
+        }
+
+        // wait for C1 and D1 to arrive via binlog
+        try {
+            Awaitility.await().atMost(120, SECONDS).pollInterval(2, 
SECONDS).until({
+                def rows = sql """SELECT count(1) FROM 
${currentDb}.${dorisTable} WHERE name IN ('C1', 'D1')"""
+                log.info("binlog rows (offset=latest): " + rows)
+                (rows.get(0).get(0) as int) == 2
+            })
+        } catch (Exception ex) {
+            log.info("job: " + (sql """select * from jobs("type"="insert") 
where Name='${jobName}'"""))
+            log.info("tasks: " + (sql """select * from tasks("type"="insert") 
where JobName='${jobName}'"""))
+            throw ex
+        }
+
+        // offset=latest: A1 and B1 must NOT be in Doris (they existed before 
job creation)
+        def preExisting = sql """SELECT count(1) FROM 
${currentDb}.${dorisTable} WHERE name IN ('A1', 'B1')"""
+        assert (preExisting.get(0).get(0) as int) == 0 :
+                "offset=latest: pre-existing rows A1/B1 must not be synced, 
but found ${preExisting.get(0).get(0)}"
+
+        qt_offset_latest_data """ SELECT * FROM ${currentDb}.${dorisTable} 
ORDER BY name """
+
+        // ── Scenario B: ALTER to wrong credentials → job should auto-pause 
────────────
+        sql """PAUSE JOB where jobname = '${jobName}'"""
+        Awaitility.await().atMost(30, SECONDS).pollInterval(1, SECONDS).until({
+            def s = sql """select status from jobs("type"="insert") where 
Name='${jobName}'"""
+            s.size() == 1 && s.get(0).get(0) == "PAUSED"
+        })
+
+        // alter with wrong password; type/jdbc_url/database/schema/table 
remain unchanged
+        sql """
+            ALTER JOB ${jobName}
+            INSERT INTO ${currentDb}.${dorisTable} (name, age)
+            SELECT name, age FROM cdc_stream(
+                "type"         = "postgres",
+                "jdbc_url"     = "${jdbcUrl}",
+                "driver_url"   = "${driver_url}",
+                "driver_class" = "org.postgresql.Driver",
+                "user"         = "${pgUser}",
+                "password"     = "wrong_password_for_test",
+                "database"     = "${pgDB}",
+                "schema"       = "${pgSchema}",
+                "table"        = "${pgTable}",
+                "offset"       = "latest"
+            )
+        """
+
+        sql """RESUME JOB where jobname = '${jobName}'"""
+
+        // job should re-pause due to credential failure
+        try {
+            Awaitility.await().atMost(120, SECONDS).pollInterval(2, 
SECONDS).until({
+                def s = sql """select status from jobs("type"="insert") where 
Name='${jobName}'"""
+                log.info("status after wrong cred resume: " + s)
+                s.size() == 1 && s.get(0).get(0) == "PAUSED"
+            })
+        } catch (Exception ex) {
+            log.info("job: " + (sql """select * from jobs("type"="insert") 
where Name='${jobName}'"""))
+            throw ex
+        }
+
+        def errorMsgAfterWrongCred = sql """select ErrorMsg from 
jobs("type"="insert") where Name='${jobName}'"""
+        log.info("error msg after wrong cred: " + errorMsgAfterWrongCred)
+        assert errorMsgAfterWrongCred.get(0).get(0) != null && 
!errorMsgAfterWrongCred.get(0).get(0).isEmpty() :
+                "ErrorMsg should be non-empty when wrong credentials are used"
+
+        // ── Scenario C: ALTER back to correct credentials and recover 
─────────────────
+        sql """
+            ALTER JOB ${jobName}
+            INSERT INTO ${currentDb}.${dorisTable} (name, age)
+            SELECT name, age FROM cdc_stream(
+                "type"         = "postgres",
+                "jdbc_url"     = "${jdbcUrl}",
+                "driver_url"   = "${driver_url}",
+                "driver_class" = "org.postgresql.Driver",
+                "user"         = "${pgUser}",
+                "password"     = "${pgPassword}",
+                "database"     = "${pgDB}",
+                "schema"       = "${pgSchema}",
+                "table"        = "${pgTable}",
+                "offset"       = "latest"
+            )
+        """
+
+        sql """RESUME JOB where jobname = '${jobName}'"""
+
+        // verify job transitions back to RUNNING
+        try {
+            Awaitility.await().atMost(60, SECONDS).pollInterval(1, 
SECONDS).until({
+                def s = sql """select status from jobs("type"="insert") where 
Name='${jobName}'"""
+                log.info("status after correct cred resume: " + s)
+                s.size() == 1 && s.get(0).get(0) == "RUNNING"
+            })
+        } catch (Exception ex) {
+            log.info("job: " + (sql """select * from jobs("type"="insert") 
where Name='${jobName}'"""))
+            throw ex
+        }
+
+        // insert rows after recovery; they should be consumed
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age) 
VALUES ('E1', 5)"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age) 
VALUES ('F1', 6)"""
+        }
+
+        try {
+            Awaitility.await().atMost(120, SECONDS).pollInterval(2, 
SECONDS).until({
+                def rows = sql """SELECT count(1) FROM 
${currentDb}.${dorisTable} WHERE name IN ('E1', 'F1')"""
+                log.info("rows after credential recovery: " + rows)
+                (rows.get(0).get(0) as int) == 2
+            })
+        } catch (Exception ex) {
+            log.info("job: " + (sql """select * from jobs("type"="insert") 
where Name='${jobName}'"""))
+            log.info("tasks: " + (sql """select * from tasks("type"="insert") 
where JobName='${jobName}'"""))
+            throw ex
+        }
+
+        // final state: only post-creation rows (C1, D1, E1, F1) — A1/B1 
absent throughout
+        qt_final_data """ SELECT * FROM ${currentDb}.${dorisTable} ORDER BY 
name """
+
+        def finalJobInfo = sql """select status, FailedTaskCount, ErrorMsg
+                                  from jobs("type"="insert") where 
Name='${jobName}'"""
+        log.info("final job info: " + finalJobInfo)
+        assert finalJobInfo.get(0).get(0) == "RUNNING"
+
+        sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+        sql """drop table if exists ${currentDb}.${dorisTable} force"""
+    }
+}
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_pause_resume.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_pause_resume.groovy
new file mode 100644
index 00000000000..d1f9263d7ac
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_pause_resume.groovy
@@ -0,0 +1,238 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+/**
+ * Test pause and resume of a streaming INSERT job using cdc_stream TVF for 
PostgreSQL.
+ *
+ * Scenario:
+ *   1. Snapshot phase: pre-existing rows (A1, B1) are synced via full 
snapshot.
+ *   2. Binlog phase begins: INSERT (C1, D1) are applied; job enters steady 
binlog state.
+ *   3. Job is paused; verify status stays PAUSED and currentOffset / 
endOffset are non-empty.
+ *   4. While paused: INSERT (E1, F1) and UPDATE C1.age → 30 into the PG 
source table.
+ *   5. Job is resumed; verify status transitions to RUNNING.
+ *   6. Verify all rows inserted before and during pause eventually appear in 
Doris (no data loss).
+ *   7. Verify FailedTaskCount == 0 and no error message after resume.
+ */
+suite("test_streaming_job_cdc_stream_postgres_pause_resume", 
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+    def jobName = "test_streaming_job_cdc_stream_pg_pause_resume"
+    def currentDb = (sql "select database()")[0][0]
+    def dorisTable = "test_streaming_job_cdc_stream_pg_pause_resume_tbl"
+    def pgDB = "postgres"
+    def pgSchema = "cdc_test"
+    def pgUser = "postgres"
+    def pgPassword = "123456"
+    def pgTable = "test_streaming_job_cdc_stream_pg_pause_resume_src"
+
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    sql """drop table if exists ${currentDb}.${dorisTable} force"""
+
+    // Use UNIQUE key so that UPDATE rows overwrite in Doris
+    sql """
+        CREATE TABLE IF NOT EXISTS ${currentDb}.${dorisTable} (
+            `name` varchar(200) NOT NULL,
+            `age`  int NULL
+        ) ENGINE=OLAP
+        UNIQUE KEY(`name`)
+        DISTRIBUTED BY HASH(`name`) BUCKETS AUTO
+        PROPERTIES ("replication_allocation" = "tag.location.default: 1")
+    """
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String pg_port = context.config.otherConfigs.get("pg_14_port")
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint = getS3Endpoint()
+        String bucket = getS3BucketName()
+        String driver_url = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar";
+
+        // ── Phase 1: prepare source table with pre-existing snapshot rows 
──────────────
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${pgTable}"""
+            sql """CREATE TABLE ${pgDB}.${pgSchema}.${pgTable} (
+                      "name" varchar(200) PRIMARY KEY,
+                      "age"  int2
+                  )"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age) 
VALUES ('A1', 1)"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age) 
VALUES ('B1', 2)"""
+        }
+
+        // ── Phase 2: create streaming job (offset=initial → snapshot then 
binlog) ─────
+        sql """
+            CREATE JOB ${jobName}
+            ON STREAMING DO INSERT INTO ${currentDb}.${dorisTable} (name, age)
+            SELECT name, age FROM cdc_stream(
+                "type"         = "postgres",
+                "jdbc_url"     = 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+                "driver_url"   = "${driver_url}",
+                "driver_class" = "org.postgresql.Driver",
+                "user"         = "${pgUser}",
+                "password"     = "${pgPassword}",
+                "database"     = "${pgDB}",
+                "schema"       = "${pgSchema}",
+                "table"        = "${pgTable}",
+                "offset"       = "initial"
+            )
+        """
+
+        // wait for snapshot tasks to complete (A1 and B1 visible)
+        try {
+            Awaitility.await().atMost(300, SECONDS).pollInterval(2, 
SECONDS).until({
+                def rows = sql """SELECT count(1) FROM 
${currentDb}.${dorisTable} WHERE name IN ('A1', 'B1')"""
+                log.info("snapshot rows: " + rows)
+                (rows.get(0).get(0) as int) == 2
+            })
+        } catch (Exception ex) {
+            log.info("job: " + (sql """select * from jobs("type"="insert") 
where Name='${jobName}'"""))
+            log.info("tasks: " + (sql """select * from tasks("type"="insert") 
where JobName='${jobName}'"""))
+            throw ex
+        }
+
+        qt_snapshot_data """ SELECT * FROM ${currentDb}.${dorisTable} ORDER BY 
name """
+
+        // ── Phase 3: insert incremental rows; wait for binlog phase to catch 
up ────────
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age) 
VALUES ('C1', 3)"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age) 
VALUES ('D1', 4)"""
+        }
+
+        try {
+            Awaitility.await().atMost(120, SECONDS).pollInterval(2, 
SECONDS).until({
+                def rows = sql """SELECT count(1) FROM 
${currentDb}.${dorisTable} WHERE name IN ('C1', 'D1')"""
+                log.info("binlog rows before pause: " + rows)
+                (rows.get(0).get(0) as int) == 2
+            })
+        } catch (Exception ex) {
+            log.info("job: " + (sql """select * from jobs("type"="insert") 
where Name='${jobName}'"""))
+            log.info("tasks: " + (sql """select * from tasks("type"="insert") 
where JobName='${jobName}'"""))
+            throw ex
+        }
+
+        // ── Phase 4: verify offset display in binlog state 
────────────────────────────
+        def jobInfoBeforePause = sql """select currentOffset, endOffset, 
FailedTaskCount, ErrorMsg
+                                        from jobs("type"="insert") where 
Name='${jobName}'"""
+        log.info("job info before pause: " + jobInfoBeforePause)
+
+        def currentOffsetBeforePause = jobInfoBeforePause.get(0).get(0) as 
String
+        def failedCountBeforePause   = jobInfoBeforePause.get(0).get(2)
+        // currentOffset should be non-empty and contain the binlog split id
+        assert currentOffsetBeforePause != null && 
!currentOffsetBeforePause.isEmpty() :
+                "currentOffset should be non-empty in binlog state, got: 
${currentOffsetBeforePause}"
+        assert currentOffsetBeforePause.contains("binlog-split") :
+                "currentOffset should contain 'binlog-split' in binlog state, 
got: ${currentOffsetBeforePause}"
+        assert (failedCountBeforePause as int) == 0 : "FailedTaskCount should 
be 0 before pause"
+
+        // ── Phase 5: pause the job 
─────────────────────────────────────────────────────
+        sql """PAUSE JOB where jobname = '${jobName}'"""
+
+        // verify job transitions to PAUSED
+        try {
+            Awaitility.await().atMost(30, SECONDS).pollInterval(1, 
SECONDS).until({
+                def status = sql """select status from jobs("type"="insert") 
where Name='${jobName}'"""
+                log.info("job status after pause: " + status)
+                status.size() == 1 && status.get(0).get(0) == "PAUSED"
+            })
+        } catch (Exception ex) {
+            log.info("job: " + (sql """select * from jobs("type"="insert") 
where Name='${jobName}'"""))
+            throw ex
+        }
+
+        // verify PAUSED state is stable (not auto-resumed) for several seconds
+        sleep(5000)
+        def pausedStatus = sql """select status from jobs("type"="insert") 
where Name='${jobName}'"""
+        assert pausedStatus.get(0).get(0) == "PAUSED" : "Job should remain 
PAUSED, got: ${pausedStatus.get(0).get(0)}"
+
+        // verify offset is still reported correctly while paused
+        def jobInfoWhilePaused = sql """select currentOffset, endOffset from 
jobs("type"="insert") where Name='${jobName}'"""
+        log.info("job info while paused: " + jobInfoWhilePaused)
+        def currentOffsetWhilePaused = jobInfoWhilePaused.get(0).get(0) as 
String
+        assert currentOffsetWhilePaused != null && 
!currentOffsetWhilePaused.isEmpty() :
+                "currentOffset should still be non-empty while paused"
+
+        // ── Phase 6: DML while job is paused 
─────────────────────────────────────────
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            // new rows added while paused
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age) 
VALUES ('E1', 5)"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age) 
VALUES ('F1', 6)"""
+            // update an existing row while paused; after resume Doris 
(UNIQUE) should reflect new value
+            sql """UPDATE ${pgDB}.${pgSchema}.${pgTable} SET age = 30 WHERE 
name = 'C1'"""
+        }
+
+        // ── Phase 7: resume the job 
────────────────────────────────────────────────────
+        sql """RESUME JOB where jobname = '${jobName}'"""
+
+        // verify job transitions back to RUNNING
+        try {
+            Awaitility.await().atMost(60, SECONDS).pollInterval(1, 
SECONDS).until({
+                def status = sql """select status from jobs("type"="insert") 
where Name='${jobName}'"""
+                log.info("job status after resume: " + status)
+                status.size() == 1 && status.get(0).get(0) == "RUNNING"
+            })
+        } catch (Exception ex) {
+            log.info("job: " + (sql """select * from jobs("type"="insert") 
where Name='${jobName}'"""))
+            throw ex
+        }
+
+        // ── Phase 8: wait for rows inserted while paused to appear 
────────────────────
+        try {
+            Awaitility.await().atMost(120, SECONDS).pollInterval(2, 
SECONDS).until({
+                def rows = sql """SELECT count(1) FROM 
${currentDb}.${dorisTable} WHERE name IN ('E1', 'F1')"""
+                log.info("rows inserted while paused: " + rows)
+                (rows.get(0).get(0) as int) == 2
+            })
+        } catch (Exception ex) {
+            log.info("job: " + (sql """select * from jobs("type"="insert") 
where Name='${jobName}'"""))
+            log.info("tasks: " + (sql """select * from tasks("type"="insert") 
where JobName='${jobName}'"""))
+            throw ex
+        }
+
+        // wait for UPDATE on C1 to be applied (age=30 in Doris)
+        try {
+            Awaitility.await().atMost(60, SECONDS).pollInterval(2, 
SECONDS).until({
+                def rows = sql """SELECT age FROM ${currentDb}.${dorisTable} 
WHERE name = 'C1'"""
+                log.info("C1 age after resume: " + rows)
+                rows.size() == 1 && (rows.get(0).get(0) as int) == 30
+            })
+        } catch (Exception ex) {
+            log.info("job: " + (sql """select * from jobs("type"="insert") 
where Name='${jobName}'"""))
+            log.info("tasks: " + (sql """select * from tasks("type"="insert") 
where JobName='${jobName}'"""))
+            throw ex
+        }
+
+        // ── Phase 9: assert final correctness 
─────────────────────────────────────────
+        qt_final_data """ SELECT * FROM ${currentDb}.${dorisTable} ORDER BY 
name """
+
+        def jobInfoAfterResume = sql """select status, currentOffset, 
FailedTaskCount, ErrorMsg
+                                        from jobs("type"="insert") where 
Name='${jobName}'"""
+        log.info("job info after resume: " + jobInfoAfterResume)
+        assert jobInfoAfterResume.get(0).get(0) == "RUNNING" : "Job should be 
RUNNING after resume"
+        def currentOffsetAfterResume = jobInfoAfterResume.get(0).get(1) as 
String
+        assert currentOffsetAfterResume != null && 
!currentOffsetAfterResume.isEmpty() :
+                "currentOffset should be non-empty after resume"
+        assert (jobInfoAfterResume.get(0).get(2) as int) == 0 : 
"FailedTaskCount should be 0 after resume"
+        def errorMsgAfterResume = jobInfoAfterResume.get(0).get(3) as String
+        assert errorMsgAfterResume == null || errorMsgAfterResume.isEmpty() :
+                "ErrorMsg should be empty after successful resume, got: 
${errorMsgAfterResume}"
+
+        sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+        sql """drop table if exists ${currentDb}.${dorisTable} force"""
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to