This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ccfa80ff3d [Improve](StreamingJob)  Support schema change for 
PostgreSQL streaming job (#61182)
8ccfa80ff3d is described below

commit 8ccfa80ff3da89a647c4311fe85e2b174cea7bd5
Author: wudi <[email protected]>
AuthorDate: Thu Mar 12 15:08:29 2026 +0800

    [Improve](StreamingJob)  Support schema change for PostgreSQL streaming job 
(#61182)
    
    ### What problem does this PR solve?
    
    #### Summary
    
    Added Schema Change support to the CDC pipeline of PostgreSQL Streaming
    Jobs, enabling Doris target tables to automatically follow DDL changes
    (ADD COLUMN / DROP COLUMN) from upstream PostgreSQL without manual
    intervention.
    
    #### Background
    
    Unlike MySQL Binlog, PostgreSQL WAL does not contain explicit DDL
    events. Schema Changes can only be detected by diffing the afterSchema
    field in the DML record with the locally cached schema.
    
    #### Implementation
    
    Detection process (three stages):
    
    1. First diff (memory, name comparison): Compares the afterSchema field
    name of the current DML record with the cached tableSchemas. If a
    difference is found, proceeds to the next step.
    
    2. JDBC refresh: Fetches the current real-time schema via PostgreSQL
    JDBC (fresh). 3. Second diff (exact comparison): Based on the
    afterSchema (not fresh), it only processes column changes already
    perceived in the current DML record, avoiding premature execution of
    subsequent DDL changes for which no DML record has yet been generated.
    
    - ADD only → generates ALTER TABLE … ADD COLUMN
    
    - DROP only → generates ALTER TABLE … DROP COLUMN
    
    - ADD + DROP simultaneously → Rename Guard: If it's determined to be a
    potential column renaming, no DDL is executed; only the cache is
    updated, and a WARN log is printed prompting the user to manually
    execute RENAME in Doris.
    
    Idempotency: SchemaChangeManager silently handles "Can not add column
    which already exists" / "Column does not exist" errors, ensuring retry
    safety.
    
    #### Limitations
    
    - RENAME COLUMN not supported: If ADD + DROP simultaneously triggers
    Rename Guard, the DDL is skipped, requiring the user to manually execute
    ALTER TABLE … RENAME COLUMN in Doris. Data flow then automatically
    resumes. - MODIFY COLUMN type not supported: Type changes are not
    visible during the name diff stage, no DDL is generated, and the Doris
    column type remains unchanged.
    - MODIFY COLUMN is not supported: Column type changes are ignored by
    design. Since type modifications do not change column names, they cannot
    be detected during the name diff stage, and therefore no DDL will be
    generated.
---
 .../doris/job/cdc/request/CommitOffsetRequest.java |  17 +-
 .../job/cdc/request/JobBaseRecordRequest.java      |   1 +
 .../insert/streaming/StreamingInsertJob.java       |   4 +
 .../insert/streaming/StreamingMultiTblTask.java    |   6 +
 .../job/offset/jdbc/JdbcSourceOffsetProvider.java  |   4 +
 .../apache/doris/cdcclient/common/Constants.java   |   2 +
 .../apache/doris/cdcclient/common/DorisType.java   |  47 +++
 .../cdcclient/service/PipelineCoordinator.java     |  61 +++-
 .../doris/cdcclient/sink/DorisBatchStreamLoad.java |  25 +-
 .../doris/cdcclient/sink/HttpPutBuilder.java       |   3 +-
 .../deserialize/DebeziumJsonDeserializer.java      |  18 +-
 .../source/deserialize/DeserializeResult.java      |  92 ++++++
 .../deserialize/MySqlDebeziumJsonDeserializer.java |  66 ++++
 .../PostgresDebeziumJsonDeserializer.java          | 248 +++++++++++++++
 .../deserialize/SourceRecordDeserializer.java      |   5 +
 .../source/reader/AbstractCdcSourceReader.java     | 171 ++++++++++
 .../source/reader/JdbcIncrementalSourceReader.java |  24 +-
 .../cdcclient/source/reader/SourceReader.java      |  18 +-
 .../source/reader/mysql/MySqlSourceReader.java     |  21 +-
 .../reader/postgres/PostgresSourceReader.java      |  32 ++
 .../org/apache/doris/cdcclient/utils/HttpUtil.java |   4 +
 .../doris/cdcclient/utils/SchemaChangeHelper.java  | 291 +++++++++++++++++
 .../doris/cdcclient/utils/SchemaChangeManager.java | 149 +++++++++
 .../cdcclient/utils/SchemaChangeHelperTest.java    | 194 ++++++++++++
 .../cdc/test_streaming_postgres_job_sc.out         |  32 ++
 .../test_streaming_postgres_job_sc_advanced.out    |  31 ++
 .../cdc/test_streaming_postgres_job_sc.groovy      | 269 ++++++++++++++++
 .../test_streaming_postgres_job_sc_advanced.groovy | 344 +++++++++++++++++++++
 28 files changed, 2132 insertions(+), 47 deletions(-)

diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/CommitOffsetRequest.java
 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/CommitOffsetRequest.java
index 3d2d221ea49..747e82b4fee 100644
--- 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/CommitOffsetRequest.java
+++ 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/CommitOffsetRequest.java
@@ -22,12 +22,10 @@ import lombok.Builder;
 import lombok.Getter;
 import lombok.NoArgsConstructor;
 import lombok.Setter;
-import lombok.ToString;
 
 @Getter
 @Setter
 @NoArgsConstructor
-@ToString
 @AllArgsConstructor
 @Builder
 public class CommitOffsetRequest {
@@ -38,4 +36,19 @@ public class CommitOffsetRequest {
     public long filteredRows;
     public long loadedRows;
     public long loadBytes;
+    public String tableSchemas;
+
+    @Override
+    public String toString() {
+        return "CommitOffsetRequest{"
+                + "jobId=" + jobId
+                + ", taskId=" + taskId
+                + ", offset='" + offset + "'"
+                + ", scannedRows=" + scannedRows
+                + ", filteredRows=" + filteredRows
+                + ", loadedRows=" + loadedRows
+                + ", loadBytes=" + loadBytes
+                + ", tableSchemasSize=" + (tableSchemas != null ? 
tableSchemas.length() : 0)
+                + "}";
+    }
 }
diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseRecordRequest.java
 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseRecordRequest.java
index 282913e2dd2..27784b1701b 100644
--- 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseRecordRequest.java
+++ 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseRecordRequest.java
@@ -28,4 +28,5 @@ import java.util.Map;
 @EqualsAndHashCode(callSuper = true)
 public abstract class JobBaseRecordRequest extends JobBaseConfig {
     protected Map<String, Object> meta;
+    protected String tableSchemas;
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index 189656fa48d..e8d168eaa43 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -1159,6 +1159,10 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
                     JdbcSourceOffsetProvider op = (JdbcSourceOffsetProvider) 
offsetProvider;
                     op.setHasMoreData(false);
                 }
+                if (offsetRequest.getTableSchemas() != null) {
+                    JdbcSourceOffsetProvider op = (JdbcSourceOffsetProvider) 
offsetProvider;
+                    op.setTableSchemas(offsetRequest.getTableSchemas());
+                }
                 persistOffsetProviderIfNeed();
                 log.info("Streaming multi table job {} task {} commit offset 
successfully, offset: {}",
                         getJobId(), offsetRequest.getTaskId(), 
offsetRequest.getOffset());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
index cf9a9b905be..45d2cf2ffbb 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
@@ -198,6 +198,12 @@ public class StreamingMultiTblTask extends 
AbstractStreamingTask {
         String feAddr = Env.getCurrentEnv().getMasterHost() + ":" + 
Env.getCurrentEnv().getMasterHttpPort();
         request.setFrontendAddress(feAddr);
         request.setMaxInterval(jobProperties.getMaxIntervalSecond());
+        if (offsetProvider instanceof JdbcSourceOffsetProvider) {
+            String schemas = ((JdbcSourceOffsetProvider) 
offsetProvider).getTableSchemas();
+            if (schemas != null) {
+                request.setTableSchemas(schemas);
+            }
+        }
         return request;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
index d04245317b5..d8959086fa5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
@@ -87,6 +87,9 @@ public class JdbcSourceOffsetProvider implements 
SourceOffsetProvider {
     @SerializedName("bop")
     Map<String, String> binlogOffsetPersist;
 
+    @SerializedName("ts")
+    String tableSchemas;
+
     volatile boolean hasMoreData = true;
 
     public JdbcSourceOffsetProvider(Long jobId, DataSourceType sourceType, 
Map<String, String> sourceProperties) {
@@ -355,6 +358,7 @@ public class JdbcSourceOffsetProvider implements 
SourceOffsetProvider {
                     JdbcSourceOffsetProvider.class);
             this.binlogOffsetPersist = 
replayFromPersist.getBinlogOffsetPersist();
             this.chunkHighWatermarkMap = 
replayFromPersist.getChunkHighWatermarkMap();
+            this.tableSchemas = replayFromPersist.getTableSchemas();
             log.info("Replaying offset provider for job {}, binlogOffset size 
{}, chunkHighWatermark size {}",
                     getJobId(),
                     binlogOffsetPersist == null ? 0 : 
binlogOffsetPersist.size(),
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Constants.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Constants.java
index 04ec118a6c2..953903a8032 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Constants.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Constants.java
@@ -23,4 +23,6 @@ public class Constants {
 
     // Debezium default properties
     public static final long DEBEZIUM_HEARTBEAT_INTERVAL_MS = 3000L;
+
+    public static final String DORIS_TARGET_DB = "doris_target_db";
 }
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/DorisType.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/DorisType.java
new file mode 100644
index 00000000000..3aad97bb0cd
--- /dev/null
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/DorisType.java
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.common;
+
+public class DorisType {
+    public static final String BOOLEAN = "BOOLEAN";
+    public static final String TINYINT = "TINYINT";
+    public static final String SMALLINT = "SMALLINT";
+    public static final String INT = "INT";
+    public static final String BIGINT = "BIGINT";
+    public static final String LARGEINT = "LARGEINT";
+    // largeint is bigint unsigned in information_schema.COLUMNS
+    public static final String BIGINT_UNSIGNED = "BIGINT UNSIGNED";
+    public static final String FLOAT = "FLOAT";
+    public static final String DOUBLE = "DOUBLE";
+    public static final String DECIMAL = "DECIMAL";
+    public static final String DATE = "DATE";
+    public static final String DATETIME = "DATETIME";
+    public static final String CHAR = "CHAR";
+    public static final String VARCHAR = "VARCHAR";
+    public static final String STRING = "STRING";
+    public static final String HLL = "HLL";
+    public static final String BITMAP = "BITMAP";
+    public static final String ARRAY = "ARRAY";
+    public static final String JSONB = "JSONB";
+    public static final String JSON = "JSON";
+    public static final String MAP = "MAP";
+    public static final String STRUCT = "STRUCT";
+    public static final String VARIANT = "VARIANT";
+    public static final String IPV4 = "IPV4";
+    public static final String IPV6 = "IPV6";
+}
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
index 614c506619f..97aa4b7f5f2 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
@@ -21,8 +21,10 @@ import org.apache.doris.cdcclient.common.Constants;
 import org.apache.doris.cdcclient.common.Env;
 import org.apache.doris.cdcclient.model.response.RecordWithMeta;
 import org.apache.doris.cdcclient.sink.DorisBatchStreamLoad;
+import org.apache.doris.cdcclient.source.deserialize.DeserializeResult;
 import org.apache.doris.cdcclient.source.reader.SourceReader;
 import org.apache.doris.cdcclient.source.reader.SplitReadResult;
+import org.apache.doris.cdcclient.utils.SchemaChangeManager;
 import org.apache.doris.job.cdc.request.FetchRecordRequest;
 import org.apache.doris.job.cdc.request.WriteRecordRequest;
 import org.apache.doris.job.cdc.split.BinlogSplit;
@@ -166,11 +168,12 @@ public class PipelineCoordinator {
                     }
 
                     // Process data messages
-                    List<String> serializedRecords =
+                    DeserializeResult result =
                             sourceReader.deserialize(fetchRecord.getConfig(), 
element);
-                    if (!CollectionUtils.isEmpty(serializedRecords)) {
+                    if (result.getType() == DeserializeResult.Type.DML
+                            && !CollectionUtils.isEmpty(result.getRecords())) {
                         recordCount++;
-                        recordResponse.getRecords().addAll(serializedRecords);
+                        
recordResponse.getRecords().addAll(result.getRecords());
                         hasReceivedData = true;
                         lastMessageIsHeartbeat = false;
                     }
@@ -236,21 +239,34 @@ public class PipelineCoordinator {
      * <p>Heartbeat events will carry the latest offset.
      */
     public void writeRecords(WriteRecordRequest writeRecordRequest) throws 
Exception {
+        // Extract connection parameters up front for use throughout this 
method
+        String feAddr = writeRecordRequest.getFrontendAddress();
+        String targetDb = writeRecordRequest.getTargetDb();
+        String token = writeRecordRequest.getToken();
+
+        // Enrich the source config with the Doris target DB so the 
deserializer can build
+        // DDL referencing the correct Doris database, not the upstream source 
database.
+        Map<String, String> deserializeContext = new 
HashMap<>(writeRecordRequest.getConfig());
+        deserializeContext.put(Constants.DORIS_TARGET_DB, targetDb);
+
         SourceReader sourceReader = 
Env.getCurrentEnv().getReader(writeRecordRequest);
         DorisBatchStreamLoad batchStreamLoad = null;
         long scannedRows = 0L;
         int heartbeatCount = 0;
         SplitReadResult readResult = null;
+        boolean hasExecuteDDL = false;
+        boolean isSnapshotSplit = false;
         try {
             // 1. submit split async
             readResult = 
sourceReader.prepareAndSubmitSplit(writeRecordRequest);
             batchStreamLoad = getOrCreateBatchStreamLoad(writeRecordRequest);
 
-            boolean isSnapshotSplit = 
sourceReader.isSnapshotSplit(readResult.getSplit());
+            isSnapshotSplit = 
sourceReader.isSnapshotSplit(readResult.getSplit());
             long startTime = System.currentTimeMillis();
             long maxIntervalMillis = writeRecordRequest.getMaxInterval() * 
1000;
             boolean shouldStop = false;
             boolean lastMessageIsHeartbeat = false;
+
             LOG.info(
                     "Start polling records for jobId={} taskId={}, 
isSnapshotSplit={}",
                     writeRecordRequest.getJobId(),
@@ -309,15 +325,22 @@ public class PipelineCoordinator {
                     }
 
                     // Process data messages
-                    List<String> serializedRecords =
-                            
sourceReader.deserialize(writeRecordRequest.getConfig(), element);
-
-                    if (!CollectionUtils.isEmpty(serializedRecords)) {
-                        String database = writeRecordRequest.getTargetDb();
+                    DeserializeResult result =
+                            sourceReader.deserialize(deserializeContext, 
element);
+
+                    if (result.getType() == 
DeserializeResult.Type.SCHEMA_CHANGE) {
+                        // Flush pending data before DDL
+                        batchStreamLoad.forceFlush();
+                        SchemaChangeManager.executeDdls(feAddr, targetDb, 
token, result.getDdls());
+                        hasExecuteDDL = true;
+                        
sourceReader.applySchemaChange(result.getUpdatedSchemas());
+                        lastMessageIsHeartbeat = false;
+                    }
+                    if (!CollectionUtils.isEmpty(result.getRecords())) {
                         String table = extractTable(element);
-                        for (String record : serializedRecords) {
+                        for (String record : result.getRecords()) {
                             scannedRows++;
-                            batchStreamLoad.writeRecord(database, table, 
record.getBytes());
+                            batchStreamLoad.writeRecord(targetDb, table, 
record.getBytes());
                         }
                         // Mark last message as data (not heartbeat)
                         lastMessageIsHeartbeat = false;
@@ -346,8 +369,22 @@ public class PipelineCoordinator {
         // The offset must be reset before commitOffset to prevent the next 
taskId from being create
         // by the fe.
         batchStreamLoad.resetTaskId();
+
+        // Serialize tableSchemas back to FE when:
+        // 1. A DDL was executed (in-memory schema was updated), OR
+        // 2. It's a binlog split AND FE had no schema (FE tableSchemas was 
null) — this covers
+        //    incremental-only startup and the first binlog round after 
snapshot completes.
+        String tableSchemas = null;
+        boolean feHadNoSchema = writeRecordRequest.getTableSchemas() == null;
+        if (hasExecuteDDL || (!isSnapshotSplit && feHadNoSchema)) {
+            tableSchemas = sourceReader.serializeTableSchemas();
+        }
         batchStreamLoad.commitOffset(
-                currentTaskId, metaResponse, scannedRows, 
batchStreamLoad.getLoadStatistic());
+                currentTaskId,
+                metaResponse,
+                scannedRows,
+                batchStreamLoad.getLoadStatistic(),
+                tableSchemas);
     }
 
     public static boolean isHeartbeatEvent(SourceRecord record) {
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
index 92a2f9db2b6..72e84c4413c 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
@@ -56,6 +56,7 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.Getter;
 import lombok.Setter;
@@ -503,7 +504,8 @@ public class DorisBatchStreamLoad implements Serializable {
             String taskId,
             List<Map<String, String>> meta,
             long scannedRows,
-            LoadStatistic loadStatistic) {
+            LoadStatistic loadStatistic,
+            String tableSchemas) {
         try {
             String url = String.format(COMMIT_URL_PATTERN, frontendAddress, 
targetDb);
             CommitOffsetRequest commitRequest =
@@ -515,6 +517,7 @@ public class DorisBatchStreamLoad implements Serializable {
                             .filteredRows(loadStatistic.getFilteredRows())
                             .loadedRows(loadStatistic.getLoadedRows())
                             .loadBytes(loadStatistic.getLoadBytes())
+                            .tableSchemas(tableSchemas)
                             .build();
             String param = OBJECT_MAPPER.writeValueAsString(commitRequest);
 
@@ -527,7 +530,11 @@ public class DorisBatchStreamLoad implements Serializable {
                             .commit()
                             .setEntity(new StringEntity(param));
 
-            LOG.info("commit offset for jobId {} taskId {}, params {}", jobId, 
taskId, param);
+            LOG.info(
+                    "commit offset for jobId {} taskId {}, commitRequest {}",
+                    jobId,
+                    taskId,
+                    commitRequest.toString());
             Throwable resEx = null;
             int retry = 0;
             while (retry <= RETRY) {
@@ -541,11 +548,15 @@ public class DorisBatchStreamLoad implements Serializable 
{
                                         : "";
                         LOG.info("commit result {}", responseBody);
                         if (statusCode == 200) {
-                            LOG.info("commit offset for jobId {} taskId {}", 
jobId, taskId);
-                            // A 200 response indicates that the request was 
successful, and
-                            // information such as offset and statistics may 
have already been
-                            // updated. Retrying may result in repeated 
updates.
-                            return;
+                            JsonNode root = 
OBJECT_MAPPER.readTree(responseBody);
+                            JsonNode code = root.get("code");
+                            if (code != null && code.asInt() == 0) {
+                                LOG.info(
+                                        "commit offset for jobId {} taskId {} 
successfully",
+                                        jobId,
+                                        taskId);
+                                return;
+                            }
                         }
                         LOG.error(
                                 "commit offset failed with {}, reason {}, to 
retry",
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/HttpPutBuilder.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/HttpPutBuilder.java
index 3abd9eaabc2..d24f61397a2 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/HttpPutBuilder.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/HttpPutBuilder.java
@@ -18,6 +18,7 @@
 package org.apache.doris.cdcclient.sink;
 
 import org.apache.doris.cdcclient.common.Constants;
+import org.apache.doris.cdcclient.utils.HttpUtil;
 
 import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.collections.MapUtils;
@@ -69,7 +70,7 @@ public class HttpPutBuilder {
     }
 
     public HttpPutBuilder addTokenAuth(String token) {
-        header.put(HttpHeaders.AUTHORIZATION, "Basic YWRtaW46");
+        header.put(HttpHeaders.AUTHORIZATION, HttpUtil.getAuthHeader());
         header.put("token", token);
         return this;
     }
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
index 556c186b5d4..065a3da2c09 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
@@ -38,7 +38,6 @@ import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.time.ZoneId;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -58,6 +57,8 @@ import io.debezium.data.VariableScaleDecimal;
 import io.debezium.data.geometry.Geography;
 import io.debezium.data.geometry.Geometry;
 import io.debezium.data.geometry.Point;
+import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
 import io.debezium.time.MicroTime;
 import io.debezium.time.MicroTimestamp;
 import io.debezium.time.NanoTime;
@@ -65,17 +66,19 @@ import io.debezium.time.NanoTimestamp;
 import io.debezium.time.Time;
 import io.debezium.time.Timestamp;
 import io.debezium.time.ZonedTimestamp;
+import lombok.Getter;
 import lombok.Setter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/** SourceRecord ==> [{},{}] */
+/** SourceRecord ==> DeserializeResult */
 public class DebeziumJsonDeserializer
-        implements SourceRecordDeserializer<SourceRecord, List<String>> {
+        implements SourceRecordDeserializer<SourceRecord, DeserializeResult> {
     private static final long serialVersionUID = 1L;
     private static final Logger LOG = 
LoggerFactory.getLogger(DebeziumJsonDeserializer.class);
     private static ObjectMapper objectMapper = new ObjectMapper();
     @Setter private ZoneId serverTimeZone = ZoneId.systemDefault();
+    @Getter @Setter protected Map<TableId, TableChanges.TableChange> 
tableSchemas;
 
     public DebeziumJsonDeserializer() {}
 
@@ -86,15 +89,14 @@ public class DebeziumJsonDeserializer
     }
 
     @Override
-    public List<String> deserialize(Map<String, String> context, SourceRecord 
record)
+    public DeserializeResult deserialize(Map<String, String> context, 
SourceRecord record)
             throws IOException {
         if (RecordUtils.isDataChangeRecord(record)) {
             LOG.trace("Process data change record: {}", record);
-            return deserializeDataChangeRecord(record);
-        } else if (RecordUtils.isSchemaChangeEvent(record)) {
-            return Collections.emptyList();
+            List<String> rows = deserializeDataChangeRecord(record);
+            return DeserializeResult.dml(rows);
         } else {
-            return Collections.emptyList();
+            return DeserializeResult.empty();
         }
     }
 
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DeserializeResult.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DeserializeResult.java
new file mode 100644
index 00000000000..c1e69c77e5a
--- /dev/null
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DeserializeResult.java
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.source.deserialize;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
+
+/** Result of deserializing a SourceRecord. */
+public class DeserializeResult {
+
+    public enum Type {
+        DML,
+        SCHEMA_CHANGE,
+        EMPTY
+    }
+
+    private final Type type;
+    private final List<String> records;
+    private final List<String> ddls;
+    private final Map<TableId, TableChanges.TableChange> updatedSchemas;
+
+    private DeserializeResult(
+            Type type,
+            List<String> records,
+            List<String> ddls,
+            Map<TableId, TableChanges.TableChange> updatedSchemas) {
+        this.type = type;
+        this.records = records;
+        this.ddls = ddls;
+        this.updatedSchemas = updatedSchemas;
+    }
+
+    public static DeserializeResult dml(List<String> records) {
+        return new DeserializeResult(Type.DML, records, null, null);
+    }
+
+    public static DeserializeResult schemaChange(
+            List<String> ddls, Map<TableId, TableChanges.TableChange> 
updatedSchemas) {
+        return new DeserializeResult(
+                Type.SCHEMA_CHANGE, Collections.emptyList(), ddls, 
updatedSchemas);
+    }
+
+    /**
+     * Schema change result that also carries DML records from the triggering 
record. The
+     * coordinator should execute DDLs first, then write the records.
+     */
+    public static DeserializeResult schemaChange(
+            List<String> ddls,
+            Map<TableId, TableChanges.TableChange> updatedSchemas,
+            List<String> records) {
+        return new DeserializeResult(Type.SCHEMA_CHANGE, records, ddls, 
updatedSchemas);
+    }
+
+    public static DeserializeResult empty() {
+        return new DeserializeResult(Type.EMPTY, Collections.emptyList(), 
null, null);
+    }
+
+    public Type getType() {
+        return type;
+    }
+
+    public List<String> getRecords() {
+        return records;
+    }
+
+    public List<String> getDdls() {
+        return ddls;
+    }
+
+    public Map<TableId, TableChanges.TableChange> getUpdatedSchemas() {
+        return updatedSchemas;
+    }
+}
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/MySqlDebeziumJsonDeserializer.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/MySqlDebeziumJsonDeserializer.java
new file mode 100644
index 00000000000..b64c7186983
--- /dev/null
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/MySqlDebeziumJsonDeserializer.java
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.source.deserialize;
+
+import org.apache.doris.job.cdc.DataSourceConfigKeys;
+
+import org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils;
+import org.apache.flink.cdc.debezium.history.FlinkJsonTableChangeSerializer;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * MySQL-specific deserializer that handles DDL schema change events.
+ *
+ * <p>When a schema change event is detected, it parses the HistoryRecord, 
computes the diff against
+ * stored tableSchemas, generates Doris ALTER TABLE SQL, and returns a 
SCHEMA_CHANGE result.
+ */
+public class MySqlDebeziumJsonDeserializer extends DebeziumJsonDeserializer {
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOG = 
LoggerFactory.getLogger(MySqlDebeziumJsonDeserializer.class);
+    private static final FlinkJsonTableChangeSerializer 
TABLE_CHANGE_SERIALIZER =
+            new FlinkJsonTableChangeSerializer();
+
+    private String targetDb;
+
+    @Override
+    public void init(Map<String, String> props) {
+        super.init(props);
+        this.targetDb = props.get(DataSourceConfigKeys.DATABASE);
+    }
+
+    @Override
+    public DeserializeResult deserialize(Map<String, String> context, 
SourceRecord record)
+            throws IOException {
+        if (RecordUtils.isSchemaChangeEvent(record)) {
+            return handleSchemaChangeEvent(record, context);
+        }
+        return super.deserialize(context, record);
+    }
+
+    private DeserializeResult handleSchemaChangeEvent(
+            SourceRecord record, Map<String, String> context) {
+        // todo: record has mysql ddl, need to convert doris ddl
+        return DeserializeResult.empty();
+    }
+}
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/PostgresDebeziumJsonDeserializer.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/PostgresDebeziumJsonDeserializer.java
new file mode 100644
index 00000000000..2dc2310054b
--- /dev/null
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/PostgresDebeziumJsonDeserializer.java
@@ -0,0 +1,248 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.source.deserialize;
+
+import org.apache.doris.cdcclient.common.Constants;
+import org.apache.doris.cdcclient.utils.SchemaChangeHelper;
+
+import org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static io.debezium.connector.AbstractSourceInfo.SCHEMA_NAME_KEY;
+import static io.debezium.connector.AbstractSourceInfo.TABLE_NAME_KEY;
+
+import io.debezium.data.Envelope;
+import io.debezium.relational.Column;
+import io.debezium.relational.TableEditor;
+import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * PostgreSQL-specific deserializer that detects schema changes (ADD/DROP 
column only) by comparing
+ * the record's Kafka Connect schema field names with stored tableSchemas.
+ *
+ * <p>Because PostgreSQL does not emit DDL events in the WAL stream, schema 
detection is done by
+ * comparing the "after" struct field names in each DML record against the 
known column set.
+ *
+ * <p>Type comparison is intentionally skipped to avoid false positives caused 
by Kafka Connect type
+ * ambiguity (e.g. text/varchar/json/uuid all appear as STRING). When a column 
add or drop is
+ * detected, the accurate column types are fetched directly from PostgreSQL 
via the injected {@link
+ * #pgSchemaRefresher} callback.
+ *
+ * <p>MODIFY column type is not supported — users must manually execute ALTER 
TABLE ... MODIFY
+ * COLUMN in Doris when a PG column type changes.
+ */
+public class PostgresDebeziumJsonDeserializer extends DebeziumJsonDeserializer 
{
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOG =
+            LoggerFactory.getLogger(PostgresDebeziumJsonDeserializer.class);
+
+    /**
+     * Callback to fetch the current PG table schema for a single table via 
JDBC. Injected by {@link
+     * org.apache.doris.cdcclient.source.reader.postgres.PostgresSourceReader} 
after initialization.
+     */
+    private transient Function<TableId, TableChanges.TableChange> 
pgSchemaRefresher;
+
+    public void setPgSchemaRefresher(Function<TableId, 
TableChanges.TableChange> refresher) {
+        this.pgSchemaRefresher = refresher;
+    }
+
+    @Override
+    public DeserializeResult deserialize(Map<String, String> context, 
SourceRecord record)
+            throws IOException {
+        if (!RecordUtils.isDataChangeRecord(record)) {
+            return DeserializeResult.empty();
+        }
+
+        Schema valueSchema = record.valueSchema();
+        if (valueSchema == null) {
+            return super.deserialize(context, record);
+        }
+
+        Field afterField = valueSchema.field(Envelope.FieldName.AFTER);
+        if (afterField == null) {
+            return super.deserialize(context, record);
+        }
+
+        Schema afterSchema = afterField.schema();
+        TableId tableId = extractTableId(record);
+        TableChanges.TableChange stored = tableSchemas != null ? 
tableSchemas.get(tableId) : null;
+
+        // No baseline schema available — cannot detect changes, fall through 
to normal
+        // deserialization
+        if (stored == null || stored.getTable() == null) {
+            LOG.debug(
+                    "No stored schema for table {}, skipping schema change 
detection.",
+                    tableId.identifier());
+            return super.deserialize(context, record);
+        }
+
+        // First pass: name-only diff — fast, in-memory, no type comparison, 
no false positives
+        SchemaChangeHelper.SchemaDiff nameDiff =
+                SchemaChangeHelper.diffSchemaByName(afterSchema, stored);
+        if (nameDiff.isEmpty()) {
+            return super.deserialize(context, record);
+        }
+
+        Preconditions.checkNotNull(
+                pgSchemaRefresher,
+                "pgSchemaRefresher callback is not set. Cannot fetch fresh PG 
schema for change detection.");
+
+        // the last fresh schema
+        TableChanges.TableChange fresh = pgSchemaRefresher.apply(tableId);
+        if (fresh == null || fresh.getTable() == null) {
+            // Cannot proceed: DDL must be executed before the triggering DML 
record is written,
+            // otherwise new column data in this record would be silently 
dropped.
+            // Throwing here causes the batch to be retried from the same 
offset.
+            throw new IOException(
+                    "Failed to fetch fresh schema for table "
+                            + tableId.identifier()
+                            + "; cannot apply schema change safely. Will 
retry.");
+        }
+
+        // Second diff: use afterSchema as the source of truth for which 
columns the current WAL
+        // record is aware of. Only process additions/drops visible in 
afterSchema — columns that
+        // exist in fresh (JDBC) but are absent from afterSchema belong to a 
later DDL that has not
+        // yet produced a DML record, and will be processed when that DML 
record arrives.
+        //
+        // pgAdded: present in afterSchema but absent from stored → look up 
Column in fresh for
+        //          accurate PG type metadata. If fresh doesn't have the 
column yet (shouldn't
+        //          happen normally), skip it.
+        // pgDropped: present in stored but absent from afterSchema.
+        List<Column> pgAdded = new ArrayList<>();
+        List<String> pgDropped = new ArrayList<>();
+
+        for (Field field : afterSchema.fields()) {
+            if (stored.getTable().columnWithName(field.name()) == null) {
+                Column freshCol = 
fresh.getTable().columnWithName(field.name());
+                if (freshCol != null) {
+                    pgAdded.add(freshCol);
+                }
+            }
+        }
+
+        for (Column col : stored.getTable().columns()) {
+            if (afterSchema.field(col.name()) == null) {
+                pgDropped.add(col.name());
+            }
+        }
+
+        // Second diff is empty: nameDiff was a false positive (PG schema 
unchanged vs stored).
+        // This happens when pgSchemaRefresher returns a schema ahead of the 
current WAL position
+        // (e.g. a later DDL was already applied in PG while we're still 
consuming older DML
+        // records).
+        // No DDL needed, no tableSchema update, no extra stream load — just 
process the DML
+        // normally.
+        if (pgAdded.isEmpty() && pgDropped.isEmpty()) {
+            return super.deserialize(context, record);
+        }
+
+        // Build updatedSchemas from fresh filtered to afterSchema columns 
only, so that the stored
+        // cache does not jump ahead to include columns not yet seen by any 
DML record. Those
+        // unseen columns will trigger their own schema change when their 
first DML record arrives.
+        TableEditor editor = fresh.getTable().edit();
+        for (Column col : fresh.getTable().columns()) {
+            if (afterSchema.field(col.name()) == null) {
+                editor.removeColumn(col.name());
+            }
+        }
+        TableChanges.TableChange filteredChange =
+                new 
TableChanges.TableChange(TableChanges.TableChangeType.ALTER, editor.create());
+        Map<TableId, TableChanges.TableChange> updatedSchemas = new 
HashMap<>();
+        updatedSchemas.put(tableId, filteredChange);
+
+        // Rename guard: simultaneous ADD+DROP may be a column RENAME — skip 
DDL to avoid data loss.
+        // Users must manually RENAME the column in Doris.
+        if (!pgAdded.isEmpty() && !pgDropped.isEmpty()) {
+            LOG.warn(
+                    "[SCHEMA-CHANGE-SKIPPED] Table: {}\n"
+                            + "Potential RENAME detected (simultaneous 
DROP+ADD).\n"
+                            + "Dropped columns: {}\n"
+                            + "Added columns:   {}\n"
+                            + "No DDL emitted to prevent data loss.\n"
+                            + "Action required: manually RENAME column(s) in 
Doris,"
+                            + " then data will resume.",
+                    tableId.identifier(),
+                    pgDropped,
+                    
pgAdded.stream().map(Column::name).collect(Collectors.toList()));
+            List<String> dmlRecords = super.deserialize(context, 
record).getRecords();
+            return DeserializeResult.schemaChange(
+                    Collections.emptyList(), updatedSchemas, dmlRecords);
+        }
+
+        // Generate DDLs using accurate PG column types
+        String db = context.get(Constants.DORIS_TARGET_DB);
+        List<String> ddls = new ArrayList<>();
+
+        for (String colName : pgDropped) {
+            ddls.add(SchemaChangeHelper.buildDropColumnSql(db, 
tableId.table(), colName));
+        }
+
+        for (Column col : pgAdded) {
+            String colType = SchemaChangeHelper.columnToDorisType(col);
+            String nullable = col.isOptional() ? "" : " NOT NULL";
+            // pgAdded only contains columns present in afterSchema, so field 
lookup is safe.
+            // afterSchema.defaultValue() returns an already-deserialized Java 
object
+            // (e.g. String "hello", Integer 42) — no PG SQL cast suffix to 
strip.
+            // PG WAL DML records do not carry column comment metadata.
+            Object defaultObj = 
afterSchema.field(col.name()).schema().defaultValue();
+            ddls.add(
+                    SchemaChangeHelper.buildAddColumnSql(
+                            db,
+                            tableId.table(),
+                            col.name(),
+                            colType + nullable,
+                            defaultObj != null ? String.valueOf(defaultObj) : 
null,
+                            null));
+        }
+
+        List<String> dmlRecords = super.deserialize(context, 
record).getRecords();
+
+        LOG.info(
+                "Postgres schema change detected for table {}: added={}, 
dropped={}. DDLs: {}",
+                tableId.identifier(),
+                
pgAdded.stream().map(Column::name).collect(Collectors.toList()),
+                pgDropped,
+                ddls);
+
+        return DeserializeResult.schemaChange(ddls, updatedSchemas, 
dmlRecords);
+    }
+
+    private TableId extractTableId(SourceRecord record) {
+        Struct value = (Struct) record.value();
+        Struct source = value.getStruct(Envelope.FieldName.SOURCE);
+        String schemaName = source.getString(SCHEMA_NAME_KEY);
+        String tableName = source.getString(TABLE_NAME_KEY);
+        return new TableId(null, schemaName, tableName);
+    }
+}
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/SourceRecordDeserializer.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/SourceRecordDeserializer.java
index f93567a230a..cc0a519da07 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/SourceRecordDeserializer.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/SourceRecordDeserializer.java
@@ -21,8 +21,13 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.Map;
 
+import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
+
 public interface SourceRecordDeserializer<T, C> extends Serializable {
     void init(Map<String, String> props);
 
     C deserialize(Map<String, String> context, T record) throws IOException;
+
+    default void setTableSchemas(Map<TableId, TableChanges.TableChange> 
tableSchemas) {}
 }
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java
new file mode 100644
index 00000000000..6ebf75a99aa
--- /dev/null
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.source.reader;
+
+import org.apache.doris.cdcclient.source.deserialize.DeserializeResult;
+import org.apache.doris.cdcclient.source.deserialize.SourceRecordDeserializer;
+import org.apache.doris.cdcclient.utils.SchemaChangeManager;
+import org.apache.doris.job.cdc.request.JobBaseRecordRequest;
+
+import org.apache.flink.cdc.connectors.base.utils.SerializerUtils;
+import org.apache.flink.cdc.debezium.history.FlinkJsonTableChangeSerializer;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import io.debezium.document.Document;
+import io.debezium.document.DocumentReader;
+import io.debezium.document.DocumentWriter;
+import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
+import lombok.Getter;
+import lombok.Setter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract base class providing common schema-tracking functionality for CDC 
source readers.
+ *
+ * <p>Handles serialization/deserialization of {@code tableSchemas} between FE 
and cdc_client, and
+ * provides a helper to load schemas from the incoming {@link 
JobBaseRecordRequest}.
+ */
+@Getter
+@Setter
+public abstract class AbstractCdcSourceReader implements SourceReader {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AbstractCdcSourceReader.class);
+
+    protected static final FlinkJsonTableChangeSerializer 
TABLE_CHANGE_SERIALIZER =
+            new FlinkJsonTableChangeSerializer();
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    protected SourceRecordDeserializer<SourceRecord, DeserializeResult> 
serializer;
+    protected Map<TableId, TableChanges.TableChange> tableSchemas;
+
+    /**
+     * Load tableSchemas from a JSON string (produced by {@link 
#serializeTableSchemas()}). Used
+     * when a binlog/stream split is resumed from FE-persisted state.
+     *
+     * <p>Format: {@code 
[{"i":"\"schema\".\"table\"","uc":false,"c":{...debeziumDoc...}},...]}.
+     */
+    public void loadTableSchemasFromJson(String json) throws IOException {
+        if (json == null || json.isEmpty()) {
+            return;
+        }
+        JsonNode root = OBJECT_MAPPER.readTree(json);
+        Map<TableId, TableChanges.TableChange> map = new ConcurrentHashMap<>();
+        DocumentReader docReader = DocumentReader.defaultReader();
+        for (JsonNode entry : root) {
+            boolean uc = entry.path("uc").asBoolean(false);
+            TableId tableId = TableId.parse(entry.get("i").asText(), uc);
+            Document doc = 
docReader.read(OBJECT_MAPPER.writeValueAsString(entry.get("c")));
+            TableChanges.TableChange change = 
FlinkJsonTableChangeSerializer.fromDocument(doc, uc);
+            map.put(tableId, change);
+        }
+        this.tableSchemas = map;
+        this.serializer.setTableSchemas(map);
+        LOG.info("Loaded {} table schemas from JSON", map.size());
+    }
+
+    /**
+     * Serialize current tableSchemas to a compact JSON string for FE 
persistence.
+     *
+     * <p>Stores the Debezium document as a nested JSON object (not a string) 
to avoid redundant
+     * escaping. Format: {@code
+     * [{"i":"\"schema\".\"table\"","uc":false,"c":{...debeziumDoc...}},...]}.
+     */
+    @Override
+    public String serializeTableSchemas() {
+        if (tableSchemas == null || tableSchemas.isEmpty()) {
+            return null;
+        }
+        try {
+            DocumentWriter docWriter = DocumentWriter.defaultWriter();
+            ArrayNode result = OBJECT_MAPPER.createArrayNode();
+            for (Map.Entry<TableId, TableChanges.TableChange> e : 
tableSchemas.entrySet()) {
+                TableId tableId = e.getKey();
+                // useCatalogBeforeSchema: false when catalog is null but 
schema is set (e.g. PG)
+                boolean uc = 
SerializerUtils.shouldUseCatalogBeforeSchema(tableId);
+                ObjectNode entry = OBJECT_MAPPER.createObjectNode();
+                entry.put("i", tableId.toDoubleQuotedString());
+                entry.put("uc", uc);
+                // parse compact doc JSON into a JsonNode so "c" is a nested 
object, not a string
+                entry.set(
+                        "c",
+                        OBJECT_MAPPER.readTree(
+                                
docWriter.write(TABLE_CHANGE_SERIALIZER.toDocument(e.getValue()))));
+                result.add(entry);
+            }
+            return OBJECT_MAPPER.writeValueAsString(result);
+        } catch (Exception e) {
+            // Return null so the current batch is not failed — data keeps 
flowing and
+            // schema persistence will be retried on the next DDL or 
feHadNoSchema batch.
+            // For PostgreSQL this is safe: WAL records carry afterSchema so 
the next DML
+            // will re-trigger schema-change detection and self-heal.
+            // WARNING: for MySQL (schema change not yet implemented), 
returning null here
+            // is dangerous — MySQL binlog has no inline schema, so loading a 
stale
+            // pre-DDL schema from FE on the next task would cause column 
mismatches
+            // (flink-cdc#732). When MySQL schema change is implemented, this 
must throw
+            // instead of returning null to prevent committing the offset with 
a stale schema.
+            LOG.error(
+                    "Failed to serialize tableSchemas, schema will not be 
persisted to FE"
+                            + " in this cycle. Will retry on next DDL or 
batch.",
+                    e);
+            return null;
+        }
+    }
+
+    /** Apply schema changes to in-memory tableSchemas and notify the 
serializer. */
+    @Override
+    public void applySchemaChange(Map<TableId, TableChanges.TableChange> 
updatedSchemas) {
+        if (updatedSchemas == null || updatedSchemas.isEmpty()) {
+            return;
+        }
+        if (tableSchemas == null) {
+            tableSchemas = new ConcurrentHashMap<>(updatedSchemas);
+        } else {
+            tableSchemas.putAll(updatedSchemas);
+        }
+        serializer.setTableSchemas(tableSchemas);
+    }
+
+    /**
+     * Load FE-persisted tableSchemas into memory from the incoming request.
+     *
+     * <p>FE's schema and offset are always committed together, so FE's schema 
always corresponds to
+     * the starting offset of the current batch. Loading it unconditionally 
ensures the deserializer
+     * uses the correct baseline — particularly critical for MySQL: Flink CDC 
only retains the
+     * latest schema in memory, so if a previous batch executed a DDL but 
failed to commit the
+     * offset, retrying from the pre-DDL offset with a stale post-DDL cache 
would cause
+     * schema-mismatch errors on every retry (see flink-cdc#732). PostgreSQL 
is unaffected by this
+     * because WAL records carry the schema at the time they were written, but 
loading FE's schema
+     * unconditionally is still correct: any re-detected DDL will be handled 
idempotently by {@link
+     * SchemaChangeManager}.
+     *
+     * <p>Call this at the start of preparing a binlog/stream split.
+     */
+    protected void tryLoadTableSchemasFromRequest(JobBaseRecordRequest 
baseReq) throws IOException {
+        loadTableSchemasFromJson(baseReq.getTableSchemas());
+    }
+}
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
index 36111d0fbf4..5b8e343faae 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
@@ -18,7 +18,7 @@
 package org.apache.doris.cdcclient.source.reader;
 
 import org.apache.doris.cdcclient.source.deserialize.DebeziumJsonDeserializer;
-import org.apache.doris.cdcclient.source.deserialize.SourceRecordDeserializer;
+import org.apache.doris.cdcclient.source.deserialize.DeserializeResult;
 import org.apache.doris.cdcclient.source.factory.DataSource;
 import org.apache.doris.job.cdc.DataSourceConfigKeys;
 import org.apache.doris.job.cdc.request.FetchTableSplitsRequest;
@@ -83,11 +83,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @Data
-public abstract class JdbcIncrementalSourceReader implements SourceReader {
+public abstract class JdbcIncrementalSourceReader extends 
AbstractCdcSourceReader {
     private static final Logger LOG = 
LoggerFactory.getLogger(JdbcIncrementalSourceReader.class);
     private static ObjectMapper objectMapper = new ObjectMapper();
-    private SourceRecordDeserializer<SourceRecord, List<String>> serializer;
-    private Map<TableId, TableChanges.TableChange> tableSchemas;
 
     // Support for multiple snapshot splits
     private List<
@@ -334,6 +332,22 @@ public abstract class JdbcIncrementalSourceReader 
implements SourceReader {
     /** Prepare stream split */
     private SplitReadResult prepareStreamSplit(
             Map<String, Object> offsetMeta, JobBaseRecordRequest baseReq) 
throws Exception {
+        // Load tableSchemas from FE if available (avoids re-discover on 
restart)
+        tryLoadTableSchemasFromRequest(baseReq);
+        // If still null (incremental-only startup, or snapshot→binlog 
transition where FE never
+        // persisted schema), do a JDBC discover so the deserializer has a 
baseline to diff against.
+        if (this.tableSchemas == null) {
+            LOG.info(
+                    "No tableSchemas available for stream split, discovering 
via JDBC for job {}",
+                    baseReq.getJobId());
+            Map<TableId, TableChanges.TableChange> discovered = 
getTableSchemas(baseReq);
+            this.tableSchemas = new 
java.util.concurrent.ConcurrentHashMap<>(discovered);
+            this.serializer.setTableSchemas(this.tableSchemas);
+            LOG.info(
+                    "Discovered {} table schema(s) for job {}",
+                    discovered.size(),
+                    baseReq.getJobId());
+        }
         Tuple2<SourceSplitBase, Boolean> splitFlag = 
createStreamSplit(offsetMeta, baseReq);
         this.streamSplit = splitFlag.f0.asStreamSplit();
         this.streamReader = getBinlogSplitReader(baseReq);
@@ -908,7 +922,7 @@ public abstract class JdbcIncrementalSourceReader 
implements SourceReader {
     }
 
     @Override
-    public List<String> deserialize(Map<String, String> config, SourceRecord 
element)
+    public DeserializeResult deserialize(Map<String, String> config, 
SourceRecord element)
             throws IOException {
         return serializer.deserialize(config, element);
     }
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SourceReader.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SourceReader.java
index 6c1a018dde3..fa4578d509b 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SourceReader.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SourceReader.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.cdcclient.source.reader;
 
+import org.apache.doris.cdcclient.source.deserialize.DeserializeResult;
 import org.apache.doris.cdcclient.source.factory.DataSource;
 import org.apache.doris.job.cdc.request.CompareOffsetRequest;
 import org.apache.doris.job.cdc.request.FetchTableSplitsRequest;
@@ -32,6 +33,9 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
+
 /** Source Reader Interface */
 public interface SourceReader {
     String SPLIT_ID = "splitId";
@@ -75,7 +79,19 @@ public interface SourceReader {
     /** Called when closing */
     void close(JobBaseConfig jobConfig);
 
-    List<String> deserialize(Map<String, String> config, SourceRecord element) 
throws IOException;
+    DeserializeResult deserialize(Map<String, String> config, SourceRecord 
element)
+            throws IOException;
+
+    /**
+     * Apply schema changes to the in-memory tableSchemas. Called after schema 
change is executed on
+     * Doris.
+     */
+    default void applySchemaChange(Map<TableId, TableChanges.TableChange> 
updatedSchemas) {}
+
+    /** Serialize current tableSchemas to JSON for persistence via 
commitOffset. */
+    default String serializeTableSchemas() {
+        return null;
+    }
 
     /**
      * Commits the given offset with the source database. Used by some source 
like Postgres to
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
index 83c9b349e4e..11e5007894d 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
@@ -17,11 +17,11 @@
 
 package org.apache.doris.cdcclient.source.reader.mysql;
 
-import org.apache.doris.cdcclient.source.deserialize.DebeziumJsonDeserializer;
-import org.apache.doris.cdcclient.source.deserialize.SourceRecordDeserializer;
+import org.apache.doris.cdcclient.source.deserialize.DeserializeResult;
+import 
org.apache.doris.cdcclient.source.deserialize.MySqlDebeziumJsonDeserializer;
 import org.apache.doris.cdcclient.source.factory.DataSource;
+import org.apache.doris.cdcclient.source.reader.AbstractCdcSourceReader;
 import org.apache.doris.cdcclient.source.reader.SnapshotReaderContext;
-import org.apache.doris.cdcclient.source.reader.SourceReader;
 import org.apache.doris.cdcclient.source.reader.SplitReadResult;
 import org.apache.doris.cdcclient.source.reader.SplitRecords;
 import org.apache.doris.cdcclient.utils.ConfigUtil;
@@ -62,7 +62,6 @@ import 
org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils;
 import org.apache.flink.cdc.connectors.mysql.source.utils.TableDiscoveryUtils;
 import org.apache.flink.cdc.connectors.mysql.table.StartupMode;
 import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
-import org.apache.flink.cdc.debezium.history.FlinkJsonTableChangeSerializer;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.kafka.connect.source.SourceRecord;
 
@@ -110,13 +109,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @Data
-public class MySqlSourceReader implements SourceReader {
+public class MySqlSourceReader extends AbstractCdcSourceReader {
     private static final Logger LOG = 
LoggerFactory.getLogger(MySqlSourceReader.class);
     private static ObjectMapper objectMapper = new ObjectMapper();
-    private static final FlinkJsonTableChangeSerializer 
TABLE_CHANGE_SERIALIZER =
-            new FlinkJsonTableChangeSerializer();
-    private SourceRecordDeserializer<SourceRecord, List<String>> serializer;
-    private Map<TableId, TableChanges.TableChange> tableSchemas;
 
     // Support for multiple snapshot splits with Round-Robin polling
     private List<
@@ -135,7 +130,7 @@ public class MySqlSourceReader implements SourceReader {
     private MySqlBinlogSplitState binlogSplitState;
 
     public MySqlSourceReader() {
-        this.serializer = new DebeziumJsonDeserializer();
+        this.serializer = new MySqlDebeziumJsonDeserializer();
         this.snapshotReaderContexts = new ArrayList<>();
     }
 
@@ -339,6 +334,8 @@ public class MySqlSourceReader implements SourceReader {
     /** Prepare binlog split */
     private SplitReadResult prepareBinlogSplit(
             Map<String, Object> offsetMeta, JobBaseRecordRequest baseReq) 
throws Exception {
+        // Load tableSchemas from FE if available (avoids re-discover on 
restart)
+        tryLoadTableSchemasFromRequest(baseReq);
         Tuple2<MySqlSplit, Boolean> splitFlag = createBinlogSplit(offsetMeta, 
baseReq);
         this.binlogSplit = (MySqlBinlogSplit) splitFlag.f0;
         this.binlogReader = getBinlogSplitReader(baseReq);
@@ -778,6 +775,8 @@ public class MySqlSourceReader implements SourceReader {
         configFactory.serverTimeZone(
                 
ConfigUtil.getTimeZoneFromProps(cu.getOriginalProperties()).toString());
 
+        // Schema change handling for MySQL is not yet implemented; keep 
disabled to avoid
+        // unnecessary processing overhead until DDL support is added.
         configFactory.includeSchemaChanges(false);
 
         String includingTables = 
cdcConfig.get(DataSourceConfigKeys.INCLUDE_TABLES);
@@ -992,7 +991,7 @@ public class MySqlSourceReader implements SourceReader {
     }
 
     @Override
-    public List<String> deserialize(Map<String, String> config, SourceRecord 
element)
+    public DeserializeResult deserialize(Map<String, String> config, 
SourceRecord element)
             throws IOException {
         return serializer.deserialize(config, element);
     }
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
index a0bb57ad120..737e36045d9 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
@@ -19,6 +19,7 @@ package org.apache.doris.cdcclient.source.reader.postgres;
 
 import org.apache.doris.cdcclient.common.Constants;
 import org.apache.doris.cdcclient.exception.CdcClientException;
+import 
org.apache.doris.cdcclient.source.deserialize.PostgresDebeziumJsonDeserializer;
 import org.apache.doris.cdcclient.source.factory.DataSource;
 import org.apache.doris.cdcclient.source.reader.JdbcIncrementalSourceReader;
 import org.apache.doris.cdcclient.utils.ConfigUtil;
@@ -55,6 +56,7 @@ import org.apache.flink.table.types.DataType;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -84,6 +86,7 @@ public class PostgresSourceReader extends 
JdbcIncrementalSourceReader {
 
     public PostgresSourceReader() {
         super();
+        this.setSerializer(new PostgresDebeziumJsonDeserializer());
     }
 
     @Override
@@ -95,6 +98,12 @@ public class PostgresSourceReader extends 
JdbcIncrementalSourceReader {
             createSlotForGlobalStreamSplit(dialect);
         }
         super.initialize(jobId, dataSource, config);
+        // Inject PG schema refresher so the deserializer can fetch accurate 
column types on DDL
+        if (serializer instanceof PostgresDebeziumJsonDeserializer) {
+            ((PostgresDebeziumJsonDeserializer) serializer)
+                    .setPgSchemaRefresher(
+                            tableId -> refreshSingleTableSchema(tableId, 
config, jobId));
+        }
     }
 
     /**
@@ -359,6 +368,29 @@ public class PostgresSourceReader extends 
JdbcIncrementalSourceReader {
         }
     }
 
+    /**
+     * Fetch the current schema for a single table directly from PostgreSQL 
via JDBC.
+     *
+     * <p>Called by {@link PostgresDebeziumJsonDeserializer} when a schema 
change (ADD/DROP column)
+     * is detected, to obtain accurate PG column types for DDL generation.
+     *
+     * @return the fresh {@link TableChanges.TableChange}
+     */
+    private TableChanges.TableChange refreshSingleTableSchema(
+            TableId tableId, Map<String, String> config, long jobId) {
+        PostgresSourceConfig sourceConfig = generatePostgresConfig(config, 
jobId, 0);
+        PostgresDialect dialect = new PostgresDialect(sourceConfig);
+        try (JdbcConnection jdbcConnection = 
dialect.openJdbcConnection(sourceConfig)) {
+            CustomPostgresSchema customPostgresSchema =
+                    new CustomPostgresSchema((PostgresConnection) 
jdbcConnection, sourceConfig);
+            Map<TableId, TableChanges.TableChange> schemas =
+                    
customPostgresSchema.getTableSchema(Collections.singletonList(tableId));
+            return schemas.get(tableId);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     @Override
     protected FetchTask<SourceSplitBase> createFetchTaskFromSplit(
             JobBaseConfig jobConfig, SourceSplitBase split) {
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/HttpUtil.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/HttpUtil.java
index 4d1356003fb..05407b2c89d 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/HttpUtil.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/HttpUtil.java
@@ -52,4 +52,8 @@ public class HttpUtil {
                 .addInterceptorLast(new RequestContent(true))
                 .build();
     }
+
+    public static String getAuthHeader() {
+        return "Basic YWRtaW46";
+    }
 }
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeHelper.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeHelper.java
new file mode 100644
index 00000000000..5eea4f1f16f
--- /dev/null
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeHelper.java
@@ -0,0 +1,291 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.utils;
+
+import org.apache.doris.cdcclient.common.DorisType;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import io.debezium.relational.Column;
+import io.debezium.relational.history.TableChanges;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Utility class for generating Doris ALTER TABLE SQL from schema diffs. */
+public class SchemaChangeHelper {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SchemaChangeHelper.class);
+    private static final String ADD_DDL = "ALTER TABLE %s ADD COLUMN %s %s";
+    private static final String DROP_DDL = "ALTER TABLE %s DROP COLUMN %s";
+
+    private SchemaChangeHelper() {}
+
+    // ─── Schema diff result 
────────────────────────────────────────────────────
+
+    /**
+     * Holds the result of a full schema comparison between an after-schema 
and stored TableChange.
+     */
+    public static class SchemaDiff {
+        /** Fields present in afterSchema but absent from stored. */
+        public final List<Field> added;
+
+        /** Column names present in stored but absent from afterSchema. */
+        public final List<String> dropped;
+
+        /** Same-named columns whose Doris type or default value differs. */
+        public final Map<String, Field> modified;
+
+        public SchemaDiff(List<Field> added, List<String> dropped, Map<String, 
Field> modified) {
+            this.added = added;
+            this.dropped = dropped;
+            this.modified = modified;
+        }
+
+        public boolean isEmpty() {
+            return added.isEmpty() && dropped.isEmpty() && modified.isEmpty();
+        }
+    }
+
+    // ─── Schema-diff helpers (Kafka Connect schema ↔ stored TableChange) 
──────
+
+    /**
+     * Name-only schema diff: compare field names in {@code afterSchema} 
against the stored {@link
+     * TableChanges.TableChange}, detecting added and dropped columns by name 
only.
+     *
+     * <p>Only support add and drop and not support modify and rename
+     *
+     * <p>When {@code stored} is null or empty, both lists are empty (no 
baseline to diff against).
+     */
+    public static SchemaDiff diffSchemaByName(Schema afterSchema, 
TableChanges.TableChange stored) {
+        List<Field> added = new ArrayList<>();
+        List<String> dropped = new ArrayList<>();
+
+        if (afterSchema == null || stored == null || stored.getTable() == 
null) {
+            return new SchemaDiff(added, dropped, new LinkedHashMap<>());
+        }
+
+        // Detect added: fields present in afterSchema but absent from stored
+        for (Field field : afterSchema.fields()) {
+            if (stored.getTable().columnWithName(field.name()) == null) {
+                added.add(field);
+            }
+        }
+
+        // Detect dropped: columns present in stored but absent from 
afterSchema
+        for (Column col : stored.getTable().columns()) {
+            if (afterSchema.field(col.name()) == null) {
+                dropped.add(col.name());
+            }
+        }
+
+        return new SchemaDiff(added, dropped, new LinkedHashMap<>());
+    }
+
+    // ─── Quoting helpers 
──────────────────────────────────────────────────────
+
+    /** Wrap a name in backticks if not already quoted. */
+    public static String identifier(String name) {
+        if (name.startsWith("`") && name.endsWith("`")) {
+            return name;
+        }
+        return "`" + name + "`";
+    }
+
+    /** Return a fully-qualified {@code `db`.`table`} identifier string. */
+    public static String quoteTableIdentifier(String db, String table) {
+        return identifier(db) + "." + identifier(table);
+    }
+
+    /**
+     * Format a default value (already a plain Java string, not a raw SQL 
expression) into a form
+     * suitable for a Doris {@code DEFAULT} clause.
+     *
+     * <p>The caller is expected to pass a <em>deserialized</em> value — e.g. 
obtained from the
+     * Kafka Connect schema via {@code 
field.schema().defaultValue().toString()} — rather than a raw
+     * PG SQL expression. This avoids having to strip PG-specific type casts 
({@code ::text}, etc.).
+     *
+     * <ul>
+     *   <li>SQL keywords ({@code NULL}, {@code CURRENT_TIMESTAMP}, {@code 
TRUE}, {@code FALSE}) are
+     *       returned as-is.
+     *   <li>Numeric literals are returned as-is (no quotes).
+     *   <li>Everything else is wrapped in single quotes.
+     * </ul>
+     */
+    public static String quoteDefaultValue(String defaultValue) {
+        if (defaultValue == null) {
+            return null;
+        }
+        if (defaultValue.equalsIgnoreCase("current_timestamp")
+                || defaultValue.equalsIgnoreCase("null")
+                || defaultValue.equalsIgnoreCase("true")
+                || defaultValue.equalsIgnoreCase("false")) {
+            return defaultValue;
+        }
+        try {
+            Double.parseDouble(defaultValue);
+            return defaultValue;
+        } catch (NumberFormatException ignored) {
+            // fall through
+        }
+        return "'" + defaultValue.replace("'", "''") + "'";
+    }
+
+    /** Escape single quotes inside a COMMENT string. */
+    public static String quoteComment(String comment) {
+        if (comment == null) {
+            return "";
+        }
+        return comment.replace("'", "''");
+    }
+
+    // ─── DDL builders 
─────────────────────────────────────────────────────────
+
+    /**
+     * Build {@code ALTER TABLE ... ADD COLUMN} SQL.
+     *
+     * @param db target database
+     * @param table target table
+     * @param colName column name
+     * @param colType Doris column type string (including optional NOT NULL)
+     * @param defaultValue optional DEFAULT value; {@code null} = omit DEFAULT 
clause
+     * @param comment optional COMMENT; {@code null}/empty = omit COMMENT 
clause
+     */
+    public static String buildAddColumnSql(
+            String db,
+            String table,
+            String colName,
+            String colType,
+            String defaultValue,
+            String comment) {
+        StringBuilder sb =
+                new StringBuilder(
+                        String.format(
+                                ADD_DDL,
+                                quoteTableIdentifier(db, table),
+                                identifier(colName),
+                                colType));
+        if (defaultValue != null) {
+            sb.append(" DEFAULT ").append(quoteDefaultValue(defaultValue));
+        }
+        appendComment(sb, comment);
+        return sb.toString();
+    }
+
+    /** Build {@code ALTER TABLE ... DROP COLUMN} SQL. */
+    public static String buildDropColumnSql(String db, String table, String 
colName) {
+        return String.format(DROP_DDL, quoteTableIdentifier(db, table), 
identifier(colName));
+    }
+
+    // ─── Type mapping 
─────────────────────────────────────────────────────────
+
+    /** Convert a Debezium Column to a Doris column type string (via PG type 
name). */
+    public static String columnToDorisType(Column column) {
+        return pgTypeNameToDorisType(column.typeName(), column.length(), 
column.scale().orElse(-1));
+    }
+
+    /** Map a PostgreSQL native type name to a Doris type string. */
+    static String pgTypeNameToDorisType(String pgTypeName, int length, int 
scale) {
+        Preconditions.checkNotNull(pgTypeName);
+        switch (pgTypeName.toLowerCase()) {
+            case "bool":
+                return DorisType.BOOLEAN;
+            case "bit":
+                return length == 1 ? DorisType.BOOLEAN : DorisType.STRING;
+            case "int2":
+            case "smallserial":
+                return DorisType.SMALLINT;
+            case "int4":
+            case "serial":
+                return DorisType.INT;
+            case "int8":
+            case "bigserial":
+                return DorisType.BIGINT;
+            case "float4":
+                return DorisType.FLOAT;
+            case "float8":
+                return DorisType.DOUBLE;
+            case "numeric":
+                {
+                    int p = length > 0 ? Math.min(length, 38) : 38;
+                    int s = scale >= 0 ? scale : 9;
+                    return String.format("%s(%d, %d)", DorisType.DECIMAL, p, 
s);
+                }
+            case "bpchar":
+                {
+                    if (length <= 0) {
+                        return DorisType.STRING;
+                    }
+                    int len = length * 3;
+                    if (len > 255) {
+                        return String.format("%s(%s)", DorisType.VARCHAR, len);
+                    } else {
+                        return String.format("%s(%s)", DorisType.CHAR, len);
+                    }
+                }
+            case "date":
+                return DorisType.DATE;
+            case "timestamp":
+            case "timestamptz":
+                {
+                    int s = (scale >= 0 && scale <= 6) ? scale : 6;
+                    return String.format("%s(%d)", DorisType.DATETIME, s);
+                }
+                // All remaining types map to STRING (aligned with 
JdbcPostgreSQLClient)
+            case "point":
+            case "line":
+            case "lseg":
+            case "box":
+            case "path":
+            case "polygon":
+            case "circle":
+            case "varchar":
+            case "text":
+            case "time":
+            case "timetz":
+            case "interval":
+            case "cidr":
+            case "inet":
+            case "macaddr":
+            case "varbit":
+            case "uuid":
+            case "bytea":
+                return DorisType.STRING;
+            case "json":
+            case "jsonb":
+                return DorisType.JSON;
+            default:
+                LOG.warn("Unrecognized PostgreSQL type '{}', defaulting to 
STRING", pgTypeName);
+                return DorisType.STRING;
+        }
+    }
+
+    // ─── Internal helpers 
─────────────────────────────────────────────────────
+
+    private static void appendComment(StringBuilder sb, String comment) {
+        if (comment != null && !comment.isEmpty()) {
+            sb.append(" COMMENT '").append(quoteComment(comment)).append("'");
+        }
+    }
+}
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeManager.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeManager.java
new file mode 100644
index 00000000000..b392df9cfcd
--- /dev/null
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeManager.java
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.utils;
+
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.util.EntityUtils;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Static utility class for executing DDL schema changes on the Doris FE via 
HTTP. */
+public class SchemaChangeManager {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SchemaChangeManager.class);
+    private static final String SCHEMA_CHANGE_API = 
"http://%s/api/query/default_cluster/%s";;
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    private static final String COLUMN_EXISTS_MSG = "Can not add column which 
already exists";
+    private static final String COLUMN_NOT_EXISTS_MSG = "Column does not 
exists";
+
+    private SchemaChangeManager() {}
+
+    /**
+     * Execute a list of DDL statements on FE. Each statement is sent 
independently.
+     *
+     * <p>Idempotent errors (ADD COLUMN when column already exists, DROP 
COLUMN when column does not
+     * exist) are logged as warnings and silently skipped, so retries on a 
different BE after a
+     * failed commitOffset do not cause infinite failures.
+     *
+     * @param feAddr Doris FE address (host:port)
+     * @param db target database
+     * @param token FE auth token
+     * @param sqls DDL statements to execute
+     */
+    public static void executeDdls(String feAddr, String db, String token, 
List<String> sqls)
+            throws IOException {
+        if (sqls == null || sqls.isEmpty()) {
+            LOG.info("No DDL statements to execute");
+            return;
+        }
+        for (String stmt : sqls) {
+            stmt = stmt.trim();
+            if (stmt.isEmpty()) {
+                continue;
+            }
+            LOG.info("Executing DDL on FE {}: {}", feAddr, stmt);
+            execute(feAddr, db, token, stmt);
+        }
+    }
+
+    /**
+     * Execute a single SQL statement via the FE query API.
+     *
+     * <p>Idempotent errors are swallowed with a warning; all other errors 
throw {@link
+     * IOException}.
+     */
+    public static void execute(String feAddr, String db, String token, String 
sql)
+            throws IOException {
+        HttpPost post = buildHttpPost(feAddr, db, token, sql);
+        String responseBody = handleResponse(post);
+        LOG.info("Executed DDL {} with response: {}", sql, responseBody);
+        parseResponse(sql, responseBody);
+    }
+
+    // ─── Internal helpers 
─────────────────────────────────────────────────────
+
+    private static HttpPost buildHttpPost(String feAddr, String db, String 
token, String sql)
+            throws IOException {
+        String url = String.format(SCHEMA_CHANGE_API, feAddr, db);
+        Map<String, Object> bodyMap = new HashMap<>();
+        bodyMap.put("stmt", sql);
+        String body = OBJECT_MAPPER.writeValueAsString(bodyMap);
+
+        HttpPost post = new HttpPost(url);
+        post.setHeader("Content-Type", "application/json;charset=UTF-8");
+        post.setHeader("Authorization", HttpUtil.getAuthHeader());
+        post.setHeader("token", token);
+        post.setEntity(new StringEntity(body, "UTF-8"));
+        return post;
+    }
+
+    private static String handleResponse(HttpPost request) throws IOException {
+        try (CloseableHttpClient client = HttpUtil.getHttpClient();
+                CloseableHttpResponse response = client.execute(request)) {
+            String responseBody =
+                    response.getEntity() != null ? 
EntityUtils.toString(response.getEntity()) : "";
+            LOG.debug("HTTP [{}]: {}", request.getURI(), responseBody);
+            return responseBody;
+        }
+    }
+
+    /**
+     * Parse the FE response. Idempotent errors are logged as warnings and 
skipped; all other errors
+     * throw.
+     *
+     * <p>Idempotent conditions (can occur when a previous commitOffset failed 
and a fresh BE
+     * re-detects and re-executes the same DDL):
+     *
+     * <ul>
+     *   <li>ADD COLUMN — "Can not add column which already exists": column 
was already added.
+     *   <li>DROP COLUMN — "Column does not exists": column was already 
dropped.
+     * </ul>
+     */
+    private static void parseResponse(String sql, String responseBody) throws 
IOException {
+        JsonNode root = OBJECT_MAPPER.readTree(responseBody);
+        JsonNode code = root.get("code");
+        if (code != null && code.asInt() == 0) {
+            return;
+        }
+
+        String msg = root.path("msg").asText("");
+
+        if (msg.contains(COLUMN_EXISTS_MSG)) {
+            LOG.warn("[DDL-IDEMPOTENT] Skipped ADD COLUMN (column already 
exists). SQL: {}", sql);
+            return;
+        }
+        if (msg.contains(COLUMN_NOT_EXISTS_MSG)) {
+            LOG.warn("[DDL-IDEMPOTENT] Skipped DROP COLUMN (column already 
absent). SQL: {}", sql);
+            return;
+        }
+
+        LOG.warn("DDL execution failed. SQL: {}. Response: {}", sql, 
responseBody);
+        throw new IOException("Failed to execute schema change: " + 
responseBody);
+    }
+}
diff --git 
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/SchemaChangeHelperTest.java
 
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/SchemaChangeHelperTest.java
new file mode 100644
index 00000000000..b71fe609d4a
--- /dev/null
+++ 
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/SchemaChangeHelperTest.java
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.utils;
+
+import org.apache.doris.cdcclient.common.DorisType;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Unit tests for {@link SchemaChangeHelper#pgTypeNameToDorisType}. */
+class SchemaChangeHelperTest {
+
+    // ─── Integer types 
────────────────────────────────────────────────────────
+
+    @Test
+    void integerTypes() {
+        assertEquals(DorisType.SMALLINT, map("int2", -1, -1));
+        assertEquals(DorisType.SMALLINT, map("smallserial", -1, -1));
+        assertEquals(DorisType.INT,      map("int4", -1, -1));
+        assertEquals(DorisType.INT,      map("serial", -1, -1));
+        assertEquals(DorisType.BIGINT,   map("int8", -1, -1));
+        assertEquals(DorisType.BIGINT,   map("bigserial", -1, -1));
+    }
+
+    @Test
+    void floatTypes() {
+        assertEquals(DorisType.FLOAT,  map("float4", -1, -1));
+        assertEquals(DorisType.DOUBLE, map("float8", -1, -1));
+    }
+
+    // ─── Boolean / bit 
───────────────────────────────────────────────────────
+
+    @Test
+    void boolType() {
+        assertEquals(DorisType.BOOLEAN, map("bool", -1, -1));
+    }
+
+    @Test
+    void bitType_singleBit_isBoolean() {
+        assertEquals(DorisType.BOOLEAN, map("bit", 1, -1));
+    }
+
+    @Test
+    void bitType_multiBit_isString() {
+        assertEquals(DorisType.STRING, map("bit", 8, -1));
+        assertEquals(DorisType.STRING, map("bit", 64, -1));
+    }
+
+    // ─── Numeric / decimal 
───────────────────────────────────────────────────
+
+    @Test
+    void numericType_defaultPrecisionScale() {
+        // length <= 0, scale < 0 → DECIMAL(38, 9)
+        assertEquals("DECIMAL(38, 9)", map("numeric", 0, -1));
+        assertEquals("DECIMAL(38, 9)", map("numeric", -1, -1));
+    }
+
+    @Test
+    void numericType_explicitPrecisionScale() {
+        assertEquals("DECIMAL(10, 2)", map("numeric", 10, 2));
+        assertEquals("DECIMAL(5, 0)",  map("numeric", 5, 0));
+    }
+
+    @Test
+    void numericType_precisionCappedAt38() {
+        assertEquals("DECIMAL(38, 4)", map("numeric", 50, 4));
+        assertEquals("DECIMAL(38, 9)", map("numeric", 100, -1));
+    }
+
+    // ─── Char types 
──────────────────────────────────────────────────────────
+
+    @Test
+    void bpchar_shortLength_isChar() {
+        // length=10 → 10*3=30 ≤ 255 → CHAR(30)
+        assertEquals("CHAR(30)", map("bpchar", 10, -1));
+        assertEquals("CHAR(3)",  map("bpchar", 1, -1));
+    }
+
+    @Test
+    void bpchar_longLength_isVarchar() {
+        // length=100 → 100*3=300 > 255 → VARCHAR(300)
+        assertEquals("VARCHAR(300)", map("bpchar", 100, -1));
+        assertEquals("VARCHAR(768)", map("bpchar", 256, -1));
+    }
+
+    @Test
+    void varcharAndText_isString() {
+        assertEquals(DorisType.STRING, map("varchar", 50, -1));
+        assertEquals(DorisType.STRING, map("varchar", -1, -1));
+        assertEquals(DorisType.STRING, map("text", -1, -1));
+    }
+
+    // ─── Date / time 
─────────────────────────────────────────────────────────
+
+    @Test
+    void dateType() {
+        assertEquals(DorisType.DATE, map("date", -1, -1));
+    }
+
+    @Test
+    void timestampType_defaultScale_isDatetime6() {
+        // scale < 0 or > 6 → default to 6
+        assertEquals("DATETIME(6)", map("timestamp", -1, -1));
+        assertEquals("DATETIME(6)", map("timestamptz", -1, -1));
+        assertEquals("DATETIME(6)", map("timestamp", -1, 7));
+    }
+
+    @Test
+    void timestampType_explicitScale() {
+        assertEquals("DATETIME(3)", map("timestamp", -1, 3));
+        assertEquals("DATETIME(0)", map("timestamptz", -1, 0));
+        assertEquals("DATETIME(6)", map("timestamp", -1, 6));
+    }
+
+    @Test
+    void timeTypes_isString() {
+        assertEquals(DorisType.STRING, map("time", -1, -1));
+        assertEquals(DorisType.STRING, map("timetz", -1, -1));
+        assertEquals(DorisType.STRING, map("interval", -1, -1));
+    }
+
+    // ─── JSON 
────────────────────────────────────────────────────────────────
+
+    @Test
+    void jsonTypes() {
+        assertEquals(DorisType.JSON, map("json", -1, -1));
+        assertEquals(DorisType.JSON, map("jsonb", -1, -1));
+    }
+
+    // ─── Geometric / network / misc types (all map to STRING) 
────────────────
+
+    @Test
+    void networkAndMiscTypes_isString() {
+        assertEquals(DorisType.STRING, map("inet", -1, -1));
+        assertEquals(DorisType.STRING, map("cidr", -1, -1));
+        assertEquals(DorisType.STRING, map("macaddr", -1, -1));
+        assertEquals(DorisType.STRING, map("uuid", -1, -1));
+        assertEquals(DorisType.STRING, map("bytea", -1, -1));
+        assertEquals(DorisType.STRING, map("varbit", -1, -1));
+    }
+
+    @Test
+    void geometricTypes_isString() {
+        assertEquals(DorisType.STRING, map("point", -1, -1));
+        assertEquals(DorisType.STRING, map("line", -1, -1));
+        assertEquals(DorisType.STRING, map("lseg", -1, -1));
+        assertEquals(DorisType.STRING, map("box", -1, -1));
+        assertEquals(DorisType.STRING, map("path", -1, -1));
+        assertEquals(DorisType.STRING, map("polygon", -1, -1));
+        assertEquals(DorisType.STRING, map("circle", -1, -1));
+    }
+
+    // ─── Unknown type fallback 
───────────────────────────────────────────────
+
+    @Test
+    void unknownType_defaultsToString() {
+        assertEquals(DorisType.STRING, map("custom_type", -1, -1));
+        assertEquals(DorisType.STRING, map("user_defined_enum", -1, -1));
+    }
+
+    // ─── Case-insensitive matching 
────────────────────────────────────────────
+
+    @Test
+    void caseInsensitive() {
+        assertEquals(DorisType.INT,     map("INT4", -1, -1));
+        assertEquals(DorisType.BIGINT,  map("INT8", -1, -1));
+        assertEquals(DorisType.BOOLEAN, map("BOOL", -1, -1));
+        assertEquals(DorisType.FLOAT,   map("FLOAT4", -1, -1));
+        assertEquals(DorisType.JSON,    map("JSON", -1, -1));
+        assertEquals(DorisType.STRING,  map("TEXT", -1, -1));
+    }
+
+    // ─── helper 
──────────────────────────────────────────────────────────────
+
+    private static String map(String pgType, int length, int scale) {
+        return SchemaChangeHelper.pgTypeNameToDorisType(pgType, length, scale);
+    }
+}
diff --git 
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_sc.out
 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_sc.out
new file mode 100644
index 00000000000..a0f35f02943
--- /dev/null
+++ 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_sc.out
@@ -0,0 +1,32 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !snapshot --
+A1     1
+B1     2
+
+-- !add_column --
+A1     1       \N
+B1     2       \N
+C1     10      hello
+
+-- !add_column_dml --
+B1     99      updated
+C1     10      world
+
+-- !drop_column --
+B1     99
+C1     10
+D1     20
+
+-- !rename --
+B1     99
+C1     10
+D1     20
+E1     \N
+
+-- !modify --
+B1     99
+C1     10
+D1     20
+E1     \N
+F1     \N
+
diff --git 
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_sc_advanced.out
 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_sc_advanced.out
new file mode 100644
index 00000000000..24907705c46
--- /dev/null
+++ 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_sc_advanced.out
@@ -0,0 +1,31 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !baseline --
+C1     30
+
+-- !double_add --
+C1     30      \N      \N
+D1     40      hello   42
+
+-- !rename_guard --
+C1     30      \N      \N
+D1     40      hello   42
+E1     50      \N      10
+
+-- !rename_guard_update --
+C1     30      \N      \N
+D1     99      \N      42
+E1     50      \N      10
+
+-- !default_col --
+C1     30      default_val
+D1     99      default_val
+E1     50      default_val
+F1     60      default_val
+
+-- !not_null_col --
+C1     30      default_val     required
+D1     99      default_val     required
+E1     50      default_val     required
+F1     60      default_val     required
+G1     70      g1c4    explicit
+
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_sc.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_sc.groovy
new file mode 100644
index 00000000000..bd590e1d97f
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_sc.groovy
@@ -0,0 +1,269 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+/**
+ * Schema-change regression for the PostgreSQL CDC streaming job.
+ *
+ * Covers four scenarios in sequence on a single table:
+ *   1. ADD COLUMN    – column added in PG → DDL executed in Doris, new data 
lands correctly.
+ *                      Also verifies: pre-ADD rows get NULL for the new 
column (existing-data
+ *                      correctness), and UPDATE/DELETE right after ADD COLUMN 
are propagated.
+ *   2. DROP COLUMN   – column dropped in PG → DDL executed in Doris, 
subsequent data lands correctly.
+ *   3. RENAME COLUMN – rename detected as simultaneous ADD+DROP (rename 
guard) →
+ *                      no DDL in Doris, 'age' column remains, new rows get 
age=NULL.
+ *   4. MODIFY COLUMN – type-only change is invisible to the name-based diff →
+ *                      no DDL in Doris, data continues to flow.
+ */
+suite("test_streaming_postgres_job_sc", 
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+    def jobName     = "test_streaming_postgres_job_name_sc"
+    def currentDb   = (sql "select database()")[0][0]
+    def table1      = "user_info_pg_normal1_sc"
+    def pgDB        = "postgres"
+    def pgSchema    = "cdc_test"
+    def pgUser      = "postgres"
+    def pgPassword  = "123456"
+
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    sql """drop table if exists ${currentDb}.${table1} force"""
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String pg_port       = context.config.otherConfigs.get("pg_14_port")
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint   = getS3Endpoint()
+        String bucket        = getS3BucketName()
+        String driver_url    = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar";
+
+        // ── helpers 
───────────────────────────────────────────────────────────
+
+        // Wait until a specific row appears in the Doris target table.
+        def waitForRow = { String rowName ->
+            Awaitility.await().atMost(120, SECONDS).pollInterval(2, 
SECONDS).until({
+                (sql "SELECT COUNT(*) FROM ${table1} WHERE name='${rowName}'"
+                )[0][0] as int > 0
+            })
+        }
+
+        // Wait until a column either exists or no longer exists in the Doris 
table.
+        def waitForColumn = { String colName, boolean shouldExist ->
+            Awaitility.await().atMost(120, SECONDS).pollInterval(2, 
SECONDS).until({
+                def desc = sql "DESC ${table1}"
+                desc.any { it[0] == colName } == shouldExist
+            })
+        }
+
+        // Wait until a specific row disappears from the Doris target table.
+        def waitForRowGone = { String rowName ->
+            Awaitility.await().atMost(120, SECONDS).pollInterval(2, 
SECONDS).until({
+                (sql "SELECT COUNT(*) FROM ${table1} WHERE name='${rowName}'"
+                )[0][0] as int == 0
+            })
+        }
+
+        // Wait until a specific column value matches the expected value for a 
row.
+        // Comparison is done as strings to avoid JDBC numeric type mismatches 
(e.g. Short vs Integer).
+        def waitForValue = { String rowName, String colName, Object expected ->
+            Awaitility.await().atMost(120, SECONDS).pollInterval(2, 
SECONDS).until({
+                def rows = sql "SELECT ${colName} FROM ${table1} WHERE 
name='${rowName}'"
+                rows.size() == 1 && String.valueOf(rows[0][0]) == 
String.valueOf(expected)
+            })
+        }
+
+        // Dump job/task state on assertion failures for easier debugging.
+        def dumpJobState = {
+            log.info("jobs  : " + sql("""select * from jobs("type"="insert") 
where Name='${jobName}'"""))
+            log.info("tasks : " + sql("""select * from tasks("type"="insert") 
where JobName='${jobName}'"""))
+        }
+
+        // ── 0. Create PG table and insert snapshot rows 
───────────────────────
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """DROP TABLE IF EXISTS ${pgSchema}.${table1}"""
+            sql """CREATE TABLE ${pgSchema}.${table1} (
+                       name VARCHAR(200) PRIMARY KEY,
+                       age  INT2
+                   )"""
+            sql """INSERT INTO ${pgSchema}.${table1} VALUES ('A1', 1)"""
+            sql """INSERT INTO ${pgSchema}.${table1} VALUES ('B1', 2)"""
+        }
+
+        // ── 1. Start streaming job 
────────────────────────────────────────────
+        sql """CREATE JOB ${jobName}
+                ON STREAMING
+                FROM POSTGRES (
+                    "jdbc_url"       = 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+                    "driver_url"     = "${driver_url}",
+                    "driver_class"   = "org.postgresql.Driver",
+                    "user"           = "${pgUser}",
+                    "password"       = "${pgPassword}",
+                    "database"       = "${pgDB}",
+                    "schema"         = "${pgSchema}",
+                    "include_tables" = "${table1}",
+                    "offset"         = "initial"
+                )
+                TO DATABASE ${currentDb} (
+                    "table.create.properties.replication_num" = "1"
+                )"""
+
+        // Verify the table was auto-created with the expected initial schema.
+        assert (sql "SHOW TABLES FROM ${currentDb} LIKE '${table1}'").size() 
== 1
+        // DESC columns: Field(0), Type(1), Null(2), Key(3), Default(4), 
Extra(5)
+        def initDesc = sql "DESC ${currentDb}.${table1}"
+        assert initDesc.find { it[0] == 'name' }[1] == 'varchar(65533)' : 
"name must be varchar(65533)"
+        assert initDesc.find { it[0] == 'age'  }[1] == 'smallint'       : "age 
must be smallint"
+        assert initDesc.find { it[0] == 'name' }[3] == 'true'           : 
"name must be primary key"
+
+        // Wait for snapshot to finish (job completes ≥ 2 tasks).
+        try {
+            Awaitility.await().atMost(300, SECONDS).pollInterval(1, 
SECONDS).until({
+                def cnt = sql """select SucceedTaskCount from 
jobs("type"="insert")
+                                  where Name='${jobName}' and 
ExecuteType='STREAMING'"""
+                cnt.size() == 1 && cnt[0][0] as int >= 2
+            })
+        } catch (Exception ex) {
+            dumpJobState()
+            throw ex
+        }
+
+        // Snapshot data: A1(1), B1(2)
+        qt_snapshot """ SELECT name, age FROM ${table1} ORDER BY name """
+
+        // ── Phase 1: ADD COLUMN c1 
────────────────────────────────────────────
+        // PG adds VARCHAR column c1; CDC detects ADD via name diff and 
executes
+        // ALTER TABLE … ADD COLUMN c1 on Doris.
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """ALTER TABLE ${pgSchema}.${table1} ADD COLUMN c1 
VARCHAR(50)"""
+            sql """INSERT INTO ${pgSchema}.${table1} (name, age, c1) VALUES 
('C1', 10, 'hello')"""
+        }
+
+        try {
+            waitForColumn('c1', true)
+            waitForRow('C1')
+        } catch (Exception ex) {
+            dumpJobState()
+            throw ex
+        }
+
+        // Verify c1 was added to Doris and the new row is present.
+        assert (sql "DESC ${table1}").any { it[0] == 'c1' } : "c1 column must 
exist in Doris after ADD COLUMN"
+
+        // Pre-ADD rows must have NULL for the new column (existing-data 
correctness).
+        assert (sql "SELECT c1 FROM ${table1} WHERE name='A1'")[0][0] == null 
: "A1.c1 must be NULL (pre-ADD row)"
+        assert (sql "SELECT c1 FROM ${table1} WHERE name='B1'")[0][0] == null 
: "B1.c1 must be NULL (pre-ADD row)"
+
+        // A1(1,null), B1(2,null), C1(10,'hello')
+        qt_add_column """ SELECT name, age, c1 FROM ${table1} ORDER BY name """
+
+        // ── Phase 1b: UPDATE / DELETE immediately after ADD COLUMN 
───────────
+        // Verifies that UPDATE (touching the new column) and DELETE on 
pre-existing rows
+        // are correctly propagated to Doris after the schema change.
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            // Update the new column on the just-inserted row.
+            sql """UPDATE ${pgSchema}.${table1} SET c1='world' WHERE 
name='C1'"""
+            // Update both an old column and the new column on a pre-existing 
row.
+            sql """UPDATE ${pgSchema}.${table1} SET age=99, c1='updated' WHERE 
name='B1'"""
+            // Delete a pre-existing row.
+            sql """DELETE FROM ${pgSchema}.${table1} WHERE name='A1'"""
+        }
+
+        try {
+            waitForRowGone('A1')
+            waitForValue('B1', 'age', 99)
+            waitForValue('C1', 'c1', 'world')
+        } catch (Exception ex) {
+            dumpJobState()
+            throw ex
+        }
+
+        // A1 deleted; B1(99,'updated'); C1(10,'world')
+        qt_add_column_dml """ SELECT name, age, c1 FROM ${table1} ORDER BY 
name """
+
+        // ── Phase 2: DROP COLUMN c1 
───────────────────────────────────────────
+        // PG drops c1; CDC detects DROP and executes ALTER TABLE … DROP 
COLUMN c1 on Doris.
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """ALTER TABLE ${pgSchema}.${table1} DROP COLUMN c1"""
+            sql """INSERT INTO ${pgSchema}.${table1} (name, age) VALUES ('D1', 
20)"""
+        }
+
+        try {
+            waitForColumn('c1', false)
+            waitForRow('D1')
+        } catch (Exception ex) {
+            dumpJobState()
+            throw ex
+        }
+
+        // Verify c1 was removed from Doris and data flows without it.
+        assert !(sql "DESC ${table1}").any { it[0] == 'c1' } : "c1 column must 
be gone from Doris after DROP COLUMN"
+        // B1(99), C1(10), D1(20)  [A1 was deleted in Phase 1b]
+        qt_drop_column """ SELECT name, age FROM ${table1} ORDER BY name """
+
+        // ── Phase 3: RENAME COLUMN age → age2 (rename guard) 
─────────────────
+        // PG rename looks like a simultaneous ADD(age2) + DROP(age) to the 
name diff.
+        // The rename guard detects this and emits a WARN with no DDL, so 
Doris schema
+        // is unchanged.  New PG rows carry 'age2' which has no matching 
column in Doris,
+        // so 'age' is NULL for those rows.
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """ALTER TABLE ${pgSchema}.${table1} RENAME COLUMN age TO 
age2"""
+            sql """INSERT INTO ${pgSchema}.${table1} (name, age2) VALUES 
('E1', 30)"""
+        }
+
+        try {
+            waitForRow('E1')
+        } catch (Exception ex) {
+            dumpJobState()
+            throw ex
+        }
+
+        // 'age' must still exist; 'age2' must NOT have been added.
+        def descAfterRename = sql "DESC ${table1}"
+        assert  descAfterRename.any { it[0] == 'age'  } : "'age' column must 
remain after rename guard"
+        assert !descAfterRename.any { it[0] == 'age2' } : "'age2' must NOT be 
added (rename guard, no DDL)"
+        // B1(99), C1(10), D1(20), E1(null) — age=NULL because PG sends age2 
which Doris ignores
+        qt_rename """ SELECT name, age FROM ${table1} ORDER BY name """
+
+        // ── Phase 4: MODIFY COLUMN type (name-only diff, no DDL) 
─────────────
+        // Type-only change is invisible to the name-based diff, so no DDL is 
emitted.
+        // Data continues to flow; age2 values still have no mapping in Doris 
→ age=NULL.
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """ALTER TABLE ${pgSchema}.${table1} ALTER COLUMN age2 TYPE 
INT4"""
+            sql """INSERT INTO ${pgSchema}.${table1} (name, age2) VALUES 
('F1', 50)"""
+        }
+
+        try {
+            waitForRow('F1')
+        } catch (Exception ex) {
+            dumpJobState()
+            throw ex
+        }
+
+        // Doris 'age' column type must remain smallint (mapped from PG int2).
+        assert (sql "DESC ${table1}").find { it[0] == 'age' }[1] == 'smallint' 
\
+            : "Doris 'age' type must remain smallint after type-only change in 
PG"
+        // B1(99), C1(10), D1(20), E1(null), F1(null)
+        qt_modify """ SELECT name, age FROM ${table1} ORDER BY name """
+
+        assert (sql """select * from jobs("type"="insert") where 
Name='${jobName}'""")[0][5] == "RUNNING"
+
+        // ── Cleanup 
───────────────────────────────────────────────────────────
+        sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+        assert (sql """select count(1) from jobs("type"="insert") where 
Name='${jobName}'""")[0][0] == 0
+    }
+}
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_sc_advanced.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_sc_advanced.groovy
new file mode 100644
index 00000000000..6593f1cf4f2
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_sc_advanced.groovy
@@ -0,0 +1,344 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+/**
+ * Advanced schema-change regression for the PostgreSQL CDC streaming job.
+ *
+ * Key differences from the basic schema-change test:
+ *   - Uses offset=latest (incremental-only, no snapshot) to cover the code 
path where
+ *     tableSchemas are discovered from PG JDBC rather than derived from 
snapshot splits.
+ *     This exercises the feHadNoSchema=true branch in PipelineCoordinator.
+ *
+ * Covers uncommon scenarios:
+ *   1. Simultaneous double ADD – two columns added in PG before any DML 
triggers detection;
+ *      both ALTER TABLEs are generated and executed in a single detection 
event.
+ *   2. DROP + ADD simultaneously (rename guard) – dropping one column while 
adding another
+ *      is treated as a potential rename; no DDL is emitted but the cached 
schema is updated.
+ *   3. UPDATE on existing rows after rename guard – verifies that a row whose 
old column (c1)
+ *      was dropped in PG gets c1=NULL in Doris after the next UPDATE (stream 
load replaces the
+ *      whole row without c1 since PG no longer has it).
+ *   4. ADD COLUMN with DEFAULT value – verifies that the DEFAULT clause is 
passed through to
+ *      Doris and that pre-existing rows automatically receive the default 
value after the DDL.
+ *   5. ADD COLUMN NOT NULL with DEFAULT – verifies the NOT NULL path in 
SchemaChangeHelper
+ *      (col.isOptional()=false → appends NOT NULL) and that Doris accepts the 
DDL when a
+ *      DEFAULT is present (satisfying the NOT NULL constraint for existing 
rows).
+ */
+suite("test_streaming_postgres_job_sc_advanced",
+        "p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+
+    def jobName   = "test_streaming_pg_sc_advanced"
+    def currentDb = (sql "select database()")[0][0]
+    def table1    = "user_info_pg_normal1_sc_adv"
+    def pgDB      = "postgres"
+    def pgSchema  = "cdc_test"
+    def pgUser    = "postgres"
+    def pgPassword = "123456"
+
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    sql """drop table if exists ${currentDb}.${table1} force"""
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String pg_port       = context.config.otherConfigs.get("pg_14_port")
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint   = getS3Endpoint()
+        String bucket        = getS3BucketName()
+        String driver_url    = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar";
+
+        // ── helpers 
───────────────────────────────────────────────────────────
+
+        def waitForRow = { String rowName ->
+            Awaitility.await().atMost(120, SECONDS).pollInterval(2, 
SECONDS).until({
+                (sql "SELECT COUNT(*) FROM ${table1} WHERE name='${rowName}'"
+                )[0][0] as int > 0
+            })
+        }
+
+        def waitForRowGone = { String rowName ->
+            Awaitility.await().atMost(120, SECONDS).pollInterval(2, 
SECONDS).until({
+                (sql "SELECT COUNT(*) FROM ${table1} WHERE name='${rowName}'"
+                )[0][0] as int == 0
+            })
+        }
+
+        def waitForColumn = { String colName, boolean shouldExist ->
+            Awaitility.await().atMost(120, SECONDS).pollInterval(2, 
SECONDS).until({
+                def desc = sql "DESC ${table1}"
+                desc.any { it[0] == colName } == shouldExist
+            })
+        }
+
+        // Comparison is done as strings to avoid JDBC numeric type mismatches.
+        def waitForValue = { String rowName, String colName, Object expected ->
+            Awaitility.await().atMost(120, SECONDS).pollInterval(2, 
SECONDS).until({
+                def rows = sql "SELECT ${colName} FROM ${table1} WHERE 
name='${rowName}'"
+                rows.size() == 1 && String.valueOf(rows[0][0]) == 
String.valueOf(expected)
+            })
+        }
+
+        def dumpJobState = {
+            log.info("jobs  : " + sql("""select * from jobs("type"="insert") 
where Name='${jobName}'"""))
+            log.info("tasks : " + sql("""select * from tasks("type"="insert") 
where JobName='${jobName}'"""))
+        }
+
+        // ── 0. Pre-create PG table with existing rows 
─────────────────────────
+        // A1, B1 are inserted BEFORE the job starts with offset=latest.
+        // They will NOT appear in Doris (no snapshot taken).
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """DROP TABLE IF EXISTS ${pgSchema}.${table1}"""
+            sql """CREATE TABLE ${pgSchema}.${table1} (
+                       name VARCHAR(200) PRIMARY KEY,
+                       age  INT4
+                   )"""
+            sql """INSERT INTO ${pgSchema}.${table1} VALUES ('A1', 10)"""
+            sql """INSERT INTO ${pgSchema}.${table1} VALUES ('B1', 20)"""
+        }
+
+        // ── 1. Start streaming job with offset=latest 
─────────────────────────
+        // The Doris table is auto-created from the PG schema at job creation 
time.
+        // Streaming begins from the current WAL LSN — A1 and B1 are not 
captured.
+        sql """CREATE JOB ${jobName}
+                ON STREAMING
+                FROM POSTGRES (
+                    "jdbc_url"       = 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+                    "driver_url"     = "${driver_url}",
+                    "driver_class"   = "org.postgresql.Driver",
+                    "user"           = "${pgUser}",
+                    "password"       = "${pgPassword}",
+                    "database"       = "${pgDB}",
+                    "schema"         = "${pgSchema}",
+                    "include_tables" = "${table1}",
+                    "offset"         = "latest"
+                )
+                TO DATABASE ${currentDb} (
+                    "table.create.properties.replication_num" = "1"
+                )"""
+
+        assert (sql "SHOW TABLES FROM ${currentDb} LIKE '${table1}'").size() 
== 1
+
+        // Wait for job to enter RUNNING state (streaming split established).
+        try {
+            Awaitility.await().atMost(120, SECONDS).pollInterval(1, 
SECONDS).until({
+                def rows = sql """select Status from jobs("type"="insert")
+                                   where Name='${jobName}' and 
ExecuteType='STREAMING'"""
+                rows.size() == 1 && rows[0][0] == "RUNNING"
+            })
+        } catch (Exception ex) {
+            dumpJobState()
+            throw ex
+        }
+
+        // Baseline: insert C1 to verify streaming is active.
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """INSERT INTO ${pgSchema}.${table1} VALUES ('C1', 30)"""
+        }
+
+        try {
+            waitForRow('C1')
+        } catch (Exception ex) {
+            dumpJobState()
+            throw ex
+        }
+
+        // A1, B1 must NOT be present (offset=latest, no snapshot).
+        assert (sql "SELECT COUNT(*) FROM ${table1} WHERE name='A1'")[0][0] as 
int == 0 \
+            : "A1 must not be present (offset=latest)"
+        assert (sql "SELECT COUNT(*) FROM ${table1} WHERE name='B1'")[0][0] as 
int == 0 \
+            : "B1 must not be present (offset=latest)"
+
+        // Only C1(30) should be in Doris.
+        qt_baseline """ SELECT name, age FROM ${table1} ORDER BY name """
+
+        // ── Phase 1: Simultaneous double ADD (c1 TEXT, c2 INT4) 
──────────────
+        // Both ALTER TABLEs happen in PG before any DML triggers CDC 
detection.
+        // The single INSERT D1 triggers the detection, which fetches the 
fresh PG schema
+        // (already containing both c1 and c2), and generates two ADD COLUMN 
DDLs in one shot.
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """ALTER TABLE ${pgSchema}.${table1} ADD COLUMN c1 TEXT"""
+            sql """ALTER TABLE ${pgSchema}.${table1} ADD COLUMN c2 INT4"""
+            sql """INSERT INTO ${pgSchema}.${table1} (name, age, c1, c2) 
VALUES ('D1', 40, 'hello', 42)"""
+        }
+
+        try {
+            waitForColumn('c1', true)
+            waitForColumn('c2', true)
+            waitForRow('D1')
+        } catch (Exception ex) {
+            dumpJobState()
+            throw ex
+        }
+
+        // DESC columns: Field(0), Type(1), Null(2), Key(3), Default(4), 
Extra(5)
+        def descAfterDoubleAdd = sql "DESC ${table1}"
+        assert descAfterDoubleAdd.find { it[0] == 'c1' }[1] == 'text' : "c1 
must be added as text"
+        assert descAfterDoubleAdd.find { it[0] == 'c2' }[1] == 'int'  : "c2 
must be added as int"
+
+        // Pre-double-ADD row C1 must have NULL for both new columns.
+        assert (sql "SELECT c1 FROM ${table1} WHERE name='C1'")[0][0] == null 
: "C1.c1 must be NULL"
+        assert (sql "SELECT c2 FROM ${table1} WHERE name='C1'")[0][0] == null 
: "C1.c2 must be NULL"
+
+        // C1(30,null,null), D1(40,'hello',42)
+        qt_double_add """ SELECT name, age, c1, c2 FROM ${table1} ORDER BY 
name """
+
+        // ── Phase 2: DROP c1 + ADD c3 simultaneously (rename guard) 
──────────
+        // Dropping c1 and adding c3 in the same batch looks like a rename to 
the CDC detector:
+        // simultaneous ADD+DROP triggers the guard → no DDL emitted, cached 
schema updated to
+        // reflect the fresh PG state (c1 gone, c3 present).
+        // Doris table is left with c1 still present; c3 is never added.
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """ALTER TABLE ${pgSchema}.${table1} DROP COLUMN c1"""
+            sql """ALTER TABLE ${pgSchema}.${table1} ADD COLUMN c3 INT4"""
+            sql """INSERT INTO ${pgSchema}.${table1} (name, age, c2, c3) 
VALUES ('E1', 50, 10, 99)"""
+        }
+
+        try {
+            waitForRow('E1')
+        } catch (Exception ex) {
+            dumpJobState()
+            throw ex
+        }
+
+        def descAfterRenameGuard = sql "DESC ${table1}"
+        assert  descAfterRenameGuard.any { it[0] == 'c1' } : "c1 must remain 
(rename guard prevented DROP)"
+        assert !descAfterRenameGuard.any { it[0] == 'c3' } : "c3 must NOT be 
added (rename guard prevented ADD)"
+
+        // E1.c1=NULL: PG has c3 (not c1), Doris ignores c3 and writes NULL 
for c1.
+        assert (sql "SELECT c1 FROM ${table1} WHERE name='E1'")[0][0] == null 
: "E1.c1 must be NULL"
+
+        // C1(30,null,null), D1(40,'hello',42), E1(50,null,10)
+        qt_rename_guard """ SELECT name, age, c1, c2 FROM ${table1} ORDER BY 
name """
+
+        // ── Phase 3: UPDATE existing row after rename guard 
───────────────────
+        // D1 had c1='hello' at insert time. After the rename guard fires, the 
cached schema
+        // reflects PG reality (c1 gone, c3 present). When D1 is updated in PG 
(only c3 exists
+        // for non-key columns), the DML record carries no c1 field. Stream 
load replaces the
+        // entire row → D1.c1 becomes NULL in Doris.
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            // PG now has columns: name, age, c2, c3 (c1 was dropped)
+            sql """UPDATE ${pgSchema}.${table1} SET age=99, c3=88 WHERE 
name='D1'"""
+        }
+
+        try {
+            waitForValue('D1', 'age', 99)
+        } catch (Exception ex) {
+            dumpJobState()
+            throw ex
+        }
+
+        // D1.c1 was 'hello' but after UPDATE the stream load has no c1 field →
+        // Doris replaces the row without c1 → c1=NULL.
+        assert (sql "SELECT c1 FROM ${table1} WHERE name='D1'")[0][0] == null \
+            : "D1.c1 must be NULL after UPDATE (c1 dropped from PG, not in 
stream load record)"
+
+        // C1(30,null,null), D1(99,null,null), E1(50,null,10)
+        qt_rename_guard_update """ SELECT name, age, c1, c2 FROM ${table1} 
ORDER BY name """
+
+        // ── Phase 4: ADD COLUMN with DEFAULT value 
────────────────────────────
+        // PG adds a nullable TEXT column with a DEFAULT value.
+        // buildAddColumnSql picks up col.defaultValueExpression() and appends 
DEFAULT 'default_val'
+        // to the Doris ALTER TABLE.  After the DDL, Doris fills the default 
for all pre-existing
+        // rows (metadata operation), so C1/D1/E1 all get c4='default_val' 
without any DML replay.
+        // F1 is inserted without an explicit c4 value → PG fills in the 
default → WAL record
+        // already carries c4='default_val', so Doris writes 'default_val' for 
F1 as well.
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            // PG current cols: name, age, c2, c3
+            sql """ALTER TABLE ${pgSchema}.${table1} ADD COLUMN c4 TEXT 
DEFAULT 'default_val'"""
+            // Trigger schema-change detection; omit c4 → PG fills default in 
WAL record.
+            sql """INSERT INTO ${pgSchema}.${table1} (name, age, c2, c3) 
VALUES ('F1', 60, 20, 77)"""
+        }
+
+        try {
+            waitForColumn('c4', true)
+            waitForRow('F1')
+        } catch (Exception ex) {
+            dumpJobState()
+            throw ex
+        }
+
+        // DESC columns: Field(0), Type(1), Null(2), Key(3), Default(4), 
Extra(5)
+        def descAfterDefaultAdd = sql "DESC ${table1}"
+        def c4Row = descAfterDefaultAdd.find { it[0] == 'c4' }
+        assert c4Row != null : "c4 must be added"
+        assert c4Row[4] == 'default_val' : "c4 must carry DEFAULT 
'default_val', got: ${c4Row[4]}"
+
+        // Pre-existing rows receive the default value from Doris's ALTER 
TABLE (not from DML replay).
+        try {
+            waitForValue('C1', 'c4', 'default_val')
+            waitForValue('D1', 'c4', 'default_val')
+            waitForValue('E1', 'c4', 'default_val')
+            waitForValue('F1', 'c4', 'default_val')
+        } catch (Exception ex) {
+            dumpJobState()
+            throw ex
+        }
+
+        // C1(30,_,default_val), D1(99,_,default_val), E1(50,_,default_val), 
F1(60,_,default_val)
+        qt_default_col """ SELECT name, age, c4 FROM ${table1} ORDER BY name 
"""
+
+        // ── Phase 5: ADD COLUMN NOT NULL with DEFAULT 
─────────────────────────
+        // In PG, adding a NOT NULL column to a non-empty table requires a 
DEFAULT so existing rows
+        // satisfy the constraint.  Debezium captures col.isOptional()=false, 
so SchemaChangeHelper
+        // appends NOT NULL to the Doris column type, and the DEFAULT clause 
is also passed through.
+        // With both NOT NULL and DEFAULT, Doris can apply the DDL: existing 
rows get the default
+        // value (satisfying NOT NULL), and new rows must supply a value or 
receive the default.
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            // PG current cols: name, age, c2, c3, c4
+            sql """ALTER TABLE ${pgSchema}.${table1}
+                       ADD COLUMN c5 TEXT NOT NULL DEFAULT 'required'"""
+            sql """INSERT INTO ${pgSchema}.${table1} (name, age, c2, c3, c4, 
c5)
+                       VALUES ('G1', 70, 30, 66, 'g1c4', 'explicit')"""
+        }
+
+        try {
+            waitForColumn('c5', true)
+            waitForRow('G1')
+        } catch (Exception ex) {
+            dumpJobState()
+            throw ex
+        }
+
+        // DESC columns: Field(0), Type(1), Null(2), Key(3), Default(4), 
Extra(5)
+        def descAfterNotNullAdd = sql "DESC ${table1}"
+        def c5Row = descAfterNotNullAdd.find { it[0] == 'c5' }
+        assert c5Row != null : "c5 must be added"
+        assert c5Row[4] == 'required' : "c5 must carry DEFAULT 'required', 
got: ${c5Row[4]}"
+
+        // Pre-existing rows must have the default value (Doris ALTER TABLE 
fills it).
+        // G1 was inserted with an explicit 'explicit' value.
+        try {
+            waitForValue('C1', 'c5', 'required')
+            waitForValue('D1', 'c5', 'required')
+            waitForValue('G1', 'c5', 'explicit')
+        } catch (Exception ex) {
+            dumpJobState()
+            throw ex
+        }
+
+        // C1(_,default_val,required), D1(_,default_val,required), 
...G1(_,g1c4,explicit)
+        qt_not_null_col """ SELECT name, age, c4, c5 FROM ${table1} ORDER BY 
name """
+
+        assert (sql """select * from jobs("type"="insert") where 
Name='${jobName}'""")[0][5] == "RUNNING"
+
+        // ── Cleanup 
───────────────────────────────────────────────────────────
+        sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+        assert (sql """select count(1) from jobs("type"="insert") where 
Name='${jobName}'""")[0][0] == 0
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to