This is an automated email from the ASF dual-hosted git repository.

JNSimba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 090b4768f82 [regression-test](streaming-job) add cdc cases 
(composite/concurrent-dml/id-gap/decimal/datetime pk) and fix split-bound 
java.time deserialize (#63471)
090b4768f82 is described below

commit 090b4768f8264af12b71924a0537924772103b3c
Author: wudi <[email protected]>
AuthorDate: Tue May 26 16:09:59 2026 +0800

    [regression-test](streaming-job) add cdc cases 
(composite/concurrent-dml/id-gap/decimal/datetime pk) and fix split-bound 
java.time deserialize (#63471)
    
    ### What problem does this PR solve?
    
    Extend CDC streaming-job regression coverage across MySQL/Postgres for
    primary-key related scenarios that the existing suites did not exercise;
    plus a small cdc-client follow-up to #63219 so split-bound restore
    handles the modern java.time types that some JDBC drivers return by
    default.
    
    #### Regression cases added
    
    | Case | Tables | Guarded by |
    |---|---|---|
    | `*_composite_pk` | composite PK + full-PK mapping table | composite PK
    chunk split + INSERT/UPDATE/DELETE locating across all PK columns;
    tenant/org-boundary chunk slicing |
    | `*_snapshot_with_concurrent_dml` | 1000-row table + unrelated decoy |
    source-side INSERT/UPDATE/DELETE during snapshot phase converges
    correctly; unrelated table outside include_tables not leaked |
    | `*_id_gap_completeness` | dense id 1~100 + outlier id 10000000 +
    post-outlier 10000001~10000100 | snapshot reader covers tail rows past
    the chunk-time max without dropping; binlog DML across all three id
    regions applies correctly |
    | `*_decimal_pk` | DECIMAL(20,4) PK + (MySQL only) BIGINT UNSIGNED PK |
    evenly-path BigDecimal / BigInteger arithmetic in chunk splitter; bigint
    unsigned values up to 2^64-1 |
    | \`mysql_*_datetime_pk\` | DATETIME(6) PK + composite (DATETIME, id) PK
    | java.time.LocalDateTime split-bound JSON round-trip; composite
    temporal PK locating |
---
 .../source/reader/AbstractCdcSourceReader.java     |  23 ++-
 .../cdc/test_streaming_mysql_job_composite_pk.out  |  47 +++++
 .../cdc/test_streaming_mysql_job_datetime_pk.out   |  29 +++
 .../cdc/test_streaming_mysql_job_decimal_pk.out    |  29 +++
 ...est_streaming_mysql_job_id_gap_completeness.out |  21 +++
 ...ming_mysql_job_snapshot_with_concurrent_dml.out |  25 +++
 .../test_streaming_postgres_job_composite_pk.out   |  47 +++++
 .../cdc/test_streaming_postgres_job_decimal_pk.out |  15 ++
 ..._streaming_postgres_job_id_gap_completeness.out |  21 +++
 ...g_postgres_job_snapshot_with_concurrent_dml.out |  25 +++
 .../test_streaming_mysql_job_composite_pk.groovy   | 198 +++++++++++++++++++++
 .../test_streaming_mysql_job_datetime_pk.groovy    | 153 ++++++++++++++++
 .../cdc/test_streaming_mysql_job_decimal_pk.groovy | 152 ++++++++++++++++
 ..._streaming_mysql_job_id_gap_completeness.groovy | 159 +++++++++++++++++
 ...g_mysql_job_snapshot_with_concurrent_dml.groovy | 148 +++++++++++++++
 ...ng_postgres_job_async_split_pause_resume.groovy |  40 ++++-
 ...test_streaming_postgres_job_composite_pk.groovy | 198 +++++++++++++++++++++
 .../test_streaming_postgres_job_decimal_pk.groovy  | 130 ++++++++++++++
 ...reaming_postgres_job_id_gap_completeness.groovy | 146 +++++++++++++++
 ...ostgres_job_snapshot_with_concurrent_dml.groovy | 153 ++++++++++++++++
 20 files changed, 1743 insertions(+), 16 deletions(-)

diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java
index f006d791528..438f3ef6556 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java
@@ -30,6 +30,7 @@ import org.apache.kafka.connect.source.SourceRecord;
 import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -88,6 +89,16 @@ public abstract class AbstractCdcSourceReader implements 
SourceReader {
         return out;
     }
 
+    private static final Map<Class<?>, Function<String, Object>> BOUND_PARSERS 
=
+            Map.of(
+                    java.sql.Date.class, java.sql.Date::valueOf,
+                    java.sql.Timestamp.class, java.sql.Timestamp::valueOf,
+                    java.sql.Time.class, java.sql.Time::valueOf,
+                    java.time.LocalDateTime.class, 
java.time.LocalDateTime::parse,
+                    java.time.LocalDate.class, java.time.LocalDate::parse,
+                    java.time.LocalTime.class, java.time.LocalTime::parse,
+                    java.time.OffsetDateTime.class, 
java.time.OffsetDateTime::parse);
+
     private static Object convertBound(Object v, Class<?> target, ObjectMapper 
mapper) {
         if (v == null) {
             return null;
@@ -95,15 +106,9 @@ public abstract class AbstractCdcSourceReader implements 
SourceReader {
         if (target.isInstance(v)) {
             return v;
         }
-        String s = v.toString();
-        if (target == java.sql.Date.class) {
-            return java.sql.Date.valueOf(s);
-        }
-        if (target == java.sql.Timestamp.class) {
-            return java.sql.Timestamp.valueOf(s);
-        }
-        if (target == java.sql.Time.class) {
-            return java.sql.Time.valueOf(s);
+        Function<String, Object> parser = BOUND_PARSERS.get(target);
+        if (parser != null) {
+            return parser.apply(v.toString());
         }
         return mapper.convertValue(v, target);
     }
diff --git 
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_composite_pk.out
 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_composite_pk.out
new file mode 100644
index 00000000000..5558c43e9ea
--- /dev/null
+++ 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_composite_pk.out
@@ -0,0 +1,47 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !desc_composite_pk --
+order_id       bigint  No      true    \N      
+tenant_id      int     No      true    \N      
+order_no       varchar(192)    Yes     false   \N      NONE
+amount decimal(10,2)   Yes     false   \N      NONE
+
+-- !select_snapshot --
+1      1001    A_001   10.00
+1      1002    A_002   20.00
+1      1003    A_003   30.00
+1      1004    A_004   40.00
+1      1005    A_005   50.00
+2      2001    B_001   100.00
+2      2002    B_002   200.00
+2      2003    B_003   300.00
+2      2004    B_004   400.00
+2      2005    B_005   500.00
+
+-- !select_snapshot_map --
+10     100
+10     101
+10     102
+20     200
+20     201
+20     202
+
+-- !select_after_incr --
+1      1002    A_002   20.00
+1      1003    A_003   999.99
+1      1004    A_004   40.00
+1      1005    A_005   50.00
+1      1006    A_006   60.00
+2      2001    B_001   100.00
+2      2002    B_002   888.88
+2      2003    B_003   300.00
+2      2004    B_004   400.00
+2      2006    B_006   600.00
+
+-- !select_after_incr_map --
+10     101
+10     102
+10     103
+20     200
+20     201
+30     300
+
diff --git 
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_datetime_pk.out
 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_datetime_pk.out
new file mode 100644
index 00000000000..abb2be0da5e
--- /dev/null
+++ 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_datetime_pk.out
@@ -0,0 +1,29 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select_snapshot_datetime_pk --
+2024-01-01T00:00       A1
+2024-06-15T12:00:00.123456     B1
+2025-01-01T00:00       C1
+2025-06-15T12:34:56.999999     D1
+2026-01-01T00:00       E1
+
+-- !select_snapshot_composite_pk --
+2024-02-01T00:00       1       A2
+2024-02-01T00:00       2       B2
+2024-02-02T12:00:00.500        3       C2
+2024-02-03T23:59:59.999999     4       D2
+2024-02-04T00:00       5       E2
+
+-- !select_after_incr_datetime_pk --
+2024-01-01T00:00       A1
+2024-06-15T12:00:00.123456     B2_upd
+2025-01-01T00:00       C1
+2026-01-01T00:00       E1
+2026-06-01T00:00       F2
+
+-- !select_after_incr_composite_pk --
+2024-02-01T00:00       1       A2
+2024-02-01T00:00       2       B2
+2024-02-02T12:00:00.500        3       C3_upd
+2024-02-04T00:00       5       E2
+2024-02-05T00:00       6       F3
+
diff --git 
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_decimal_pk.out
 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_decimal_pk.out
new file mode 100644
index 00000000000..1f195fc3a5b
--- /dev/null
+++ 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_decimal_pk.out
@@ -0,0 +1,29 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select_snapshot_decimal --
+1.0000 A1
+2.5000 B1
+3.7500 C1
+4.1234 D1
+9999999999999.9999     E1
+
+-- !select_snapshot_bigint_unsigned --
+0      zero
+1      one
+9223372036854775807    signed_max
+9223372036854775808    signed_max_plus1
+18446744073709551615   unsigned_max
+
+-- !select_after_incr_decimal --
+1.0000 A1
+2.5000 B2_upd
+3.7500 C1
+5.5555 F2
+9999999999999.9999     E1
+
+-- !select_after_incr_bigint_unsigned --
+0      zero
+1      one
+9223372036854775807    signed_max_upd
+10000000000000000000   incr_huge
+18446744073709551615   unsigned_max
+
diff --git 
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_id_gap_completeness.out
 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_id_gap_completeness.out
new file mode 100644
index 00000000000..f83e36329a0
--- /dev/null
+++ 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_id_gap_completeness.out
@@ -0,0 +1,21 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select_snapshot_count --
+201
+
+-- !select_snapshot_dense_sample --
+1      dense
+50     dense
+100    dense
+
+-- !select_snapshot_outlier_sample --
+10000000       outlier
+10000050       post_outlier
+10000100       post_outlier
+
+-- !select_after_incr_count --
+201
+
+-- !select_after_incr_changed --
+50     dense   99
+10000200       incr_outlier    1
+
diff --git 
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_snapshot_with_concurrent_dml.out
 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_snapshot_with_concurrent_dml.out
new file mode 100644
index 00000000000..7f1b97bd503
--- /dev/null
+++ 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_snapshot_with_concurrent_dml.out
@@ -0,0 +1,25 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select_count --
+1007
+
+-- !select_updates --
+1      99
+100    99
+500    99
+999    99
+
+-- !select_deletes --
+0
+
+-- !select_inserts --
+1001   concurrent_ins  1
+1002   concurrent_ins  1
+1003   concurrent_ins  1
+1004   concurrent_ins  1
+1005   concurrent_ins  1
+1006   concurrent_ins  1
+1007   concurrent_ins  1
+1008   concurrent_ins  1
+1009   concurrent_ins  1
+1010   concurrent_ins  1
+
diff --git 
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_composite_pk.out
 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_composite_pk.out
new file mode 100644
index 00000000000..bb05bc4cd26
--- /dev/null
+++ 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_composite_pk.out
@@ -0,0 +1,47 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !desc_composite_pk --
+org_id int     No      true    \N      
+item_id        bigint  No      true    \N      
+item_no        text    Yes     false   \N      NONE
+qty    int     Yes     false   \N      NONE
+
+-- !select_snapshot --
+1      1001    A_001   10
+1      1002    A_002   20
+1      1003    A_003   30
+1      1004    A_004   40
+1      1005    A_005   50
+2      2001    B_001   100
+2      2002    B_002   200
+2      2003    B_003   300
+2      2004    B_004   400
+2      2005    B_005   500
+
+-- !select_snapshot_map --
+10     100
+10     101
+10     102
+20     200
+20     201
+20     202
+
+-- !select_after_incr --
+1      1002    A_002   20
+1      1003    A_003   999
+1      1004    A_004   40
+1      1005    A_005   50
+1      1006    A_006   60
+2      2001    B_001   100
+2      2002    B_002   888
+2      2003    B_003   300
+2      2004    B_004   400
+2      2006    B_006   600
+
+-- !select_after_incr_map --
+10     101
+10     102
+10     103
+20     200
+20     201
+30     300
+
diff --git 
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_decimal_pk.out
 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_decimal_pk.out
new file mode 100644
index 00000000000..db5c46e4d07
--- /dev/null
+++ 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_decimal_pk.out
@@ -0,0 +1,15 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select_snapshot --
+1.0000 A1
+2.5000 B1
+3.7500 C1
+4.1234 D1
+9999999999999.9999     E1
+
+-- !select_after_incr --
+1.0000 A1
+2.5000 B2_upd
+3.7500 C1
+5.5555 F2
+9999999999999.9999     E1
+
diff --git 
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_id_gap_completeness.out
 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_id_gap_completeness.out
new file mode 100644
index 00000000000..f83e36329a0
--- /dev/null
+++ 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_id_gap_completeness.out
@@ -0,0 +1,21 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select_snapshot_count --
+201
+
+-- !select_snapshot_dense_sample --
+1      dense
+50     dense
+100    dense
+
+-- !select_snapshot_outlier_sample --
+10000000       outlier
+10000050       post_outlier
+10000100       post_outlier
+
+-- !select_after_incr_count --
+201
+
+-- !select_after_incr_changed --
+50     dense   99
+10000200       incr_outlier    1
+
diff --git 
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_with_concurrent_dml.out
 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_with_concurrent_dml.out
new file mode 100644
index 00000000000..7f1b97bd503
--- /dev/null
+++ 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_with_concurrent_dml.out
@@ -0,0 +1,25 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select_count --
+1007
+
+-- !select_updates --
+1      99
+100    99
+500    99
+999    99
+
+-- !select_deletes --
+0
+
+-- !select_inserts --
+1001   concurrent_ins  1
+1002   concurrent_ins  1
+1003   concurrent_ins  1
+1004   concurrent_ins  1
+1005   concurrent_ins  1
+1006   concurrent_ins  1
+1007   concurrent_ins  1
+1008   concurrent_ins  1
+1009   concurrent_ins  1
+1010   concurrent_ins  1
+
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_composite_pk.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_composite_pk.groovy
new file mode 100644
index 00000000000..188d459be5f
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_composite_pk.groovy
@@ -0,0 +1,198 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_mysql_job_composite_pk", 
"p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
+    def jobName = "test_streaming_mysql_job_composite_pk_name"
+    def currentDb = (sql "select database()")[0][0]
+    def table1 = "streaming_composite_pk_mysql"
+    def table2 = "streaming_full_pk_map_mysql"
+    def mysqlDb = "test_cdc_db"
+
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    sql """drop table if exists ${currentDb}.${table1} force"""
+    sql """drop table if exists ${currentDb}.${table2} force"""
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String mysql_port = context.config.otherConfigs.get("mysql_57_port");
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint = getS3Endpoint()
+        String bucket = getS3BucketName()
+        String driver_url = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar";
+
+        // ===== Prepare MySQL side =====
+        connect("root", "123456", 
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+            sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}"""
+            sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}"""
+            sql """
+            create table ${mysqlDb}.${table1} (
+              `tenant_id` int not null,
+              `order_id` bigint not null,
+              `order_no` varchar(64),
+              `amount` decimal(10,2),
+              primary key (`tenant_id`, `order_id`)
+            ) engine=innodb default charset=utf8;
+            """
+
+            // Snapshot rows: 2 tenants x 5 orders each.
+            sql """insert into ${mysqlDb}.${table1} values (1, 1001, 'A_001', 
10.00)"""
+            sql """insert into ${mysqlDb}.${table1} values (1, 1002, 'A_002', 
20.00)"""
+            sql """insert into ${mysqlDb}.${table1} values (1, 1003, 'A_003', 
30.00)"""
+            sql """insert into ${mysqlDb}.${table1} values (1, 1004, 'A_004', 
40.00)"""
+            sql """insert into ${mysqlDb}.${table1} values (1, 1005, 'A_005', 
50.00)"""
+            sql """insert into ${mysqlDb}.${table1} values (2, 2001, 'B_001', 
100.00)"""
+            sql """insert into ${mysqlDb}.${table1} values (2, 2002, 'B_002', 
200.00)"""
+            sql """insert into ${mysqlDb}.${table1} values (2, 2003, 'B_003', 
300.00)"""
+            sql """insert into ${mysqlDb}.${table1} values (2, 2004, 'B_004', 
400.00)"""
+            sql """insert into ${mysqlDb}.${table1} values (2, 2005, 'B_005', 
500.00)"""
+
+            sql """DROP TABLE IF EXISTS ${mysqlDb}.${table2}"""
+            sql """
+            create table ${mysqlDb}.${table2} (
+              `user_id` int not null,
+              `role_id` int not null,
+              primary key (`user_id`, `role_id`)
+            ) engine=innodb default charset=utf8;
+            """
+            sql """insert into ${mysqlDb}.${table2} values (10, 100)"""
+            sql """insert into ${mysqlDb}.${table2} values (10, 101)"""
+            sql """insert into ${mysqlDb}.${table2} values (10, 102)"""
+            sql """insert into ${mysqlDb}.${table2} values (20, 200)"""
+            sql """insert into ${mysqlDb}.${table2} values (20, 201)"""
+            sql """insert into ${mysqlDb}.${table2} values (20, 202)"""
+        }
+
+        // snapshot_split_size=3 -> chunks cross the tenant boundary.
+        sql """CREATE JOB ${jobName}
+                ON STREAMING
+                FROM MYSQL (
+                    "jdbc_url" = 
"jdbc:mysql://${externalEnvIp}:${mysql_port}?serverTimezone=UTC",
+                    "driver_url" = "${driver_url}",
+                    "driver_class" = "com.mysql.cj.jdbc.Driver",
+                    "user" = "root",
+                    "password" = "123456",
+                    "database" = "${mysqlDb}",
+                    "include_tables" = "${table1},${table2}",
+                    "offset" = "initial",
+                    "snapshot_split_size" = "3"
+                )
+                TO DATABASE ${currentDb} (
+                  "table.create.properties.replication_num" = "1"
+                )
+            """
+
+        try {
+            Awaitility.await().atMost(300, SECONDS)
+                    .pollInterval(2, SECONDS).until(
+                    {
+                        def cnt1 = sql """select count(1) from 
${currentDb}.${table1}"""
+                        def cnt2 = sql """select count(1) from 
${currentDb}.${table2}"""
+                        log.info("snapshot row count table1=${cnt1} 
table2=${cnt2}")
+                        cnt1.get(0).get(0) == 10 && cnt2.get(0).get(0) == 6
+                    }
+            )
+        } catch (Exception ex) {
+            def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job: " + showjob)
+            log.info("show task: " + showtask)
+            throw ex
+        }
+
+        def showTbl = sql """show create table ${currentDb}.${table1}"""
+        def createInfo = showTbl[0][1]
+        log.info("create table: " + createInfo)
+        assert createInfo.contains("UNIQUE KEY(`order_id`, `tenant_id`)")
+
+        def showTbl2 = sql """show create table ${currentDb}.${table2}"""
+        def createInfo2 = showTbl2[0][1]
+        log.info("create table2: " + createInfo2)
+        assert createInfo2.contains("UNIQUE KEY(`role_id`, `user_id`)")
+
+        qt_desc_composite_pk """desc ${currentDb}.${table1};"""
+        qt_select_snapshot """select tenant_id, order_id, order_no, amount 
from ${currentDb}.${table1} order by tenant_id, order_id;"""
+        qt_select_snapshot_map """select user_id, role_id from 
${currentDb}.${table2} order by user_id, role_id;"""
+
+        // ===== Binlog phase: INSERT / UPDATE / DELETE that all require 
composite-PK locating =====
+        connect("root", "123456", 
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+            sql """insert into ${mysqlDb}.${table1} values (1, 1006, 'A_006', 
60.00)"""
+            sql """insert into ${mysqlDb}.${table1} values (2, 2006, 'B_006', 
600.00)"""
+
+            // UPDATEs targeting composite PK; both halves of the PK in WHERE.
+            sql """update ${mysqlDb}.${table1} set amount=999.99 where 
tenant_id=1 and order_id=1003"""
+            sql """update ${mysqlDb}.${table1} set amount=888.88 where 
tenant_id=2 and order_id=2002"""
+
+            // DELETEs targeting composite PK.
+            sql """delete from ${mysqlDb}.${table1} where tenant_id=1 and 
order_id=1001"""
+            sql """delete from ${mysqlDb}.${table1} where tenant_id=2 and 
order_id=2005"""
+
+            // table2 full-PK mapping table: INSERT + DELETE only (no value 
column to UPDATE).
+            sql """insert into ${mysqlDb}.${table2} values (10, 103)"""
+            sql """insert into ${mysqlDb}.${table2} values (30, 300)"""
+            sql """delete from ${mysqlDb}.${table2} where user_id=10 and 
role_id=100"""
+            sql """delete from ${mysqlDb}.${table2} where user_id=20 and 
role_id=202"""
+        }
+
+        try {
+            Awaitility.await().atMost(180, SECONDS)
+                    .pollInterval(2, SECONDS).until(
+                    {
+                        def cnt = sql """select count(1) from 
${currentDb}.${table1}"""
+                        def cntMap = sql """select count(1) from 
${currentDb}.${table2}"""
+                        def upd1 = sql """select amount from 
${currentDb}.${table1} where tenant_id=1 and order_id=1003"""
+                        def upd2 = sql """select amount from 
${currentDb}.${table1} where tenant_id=2 and order_id=2002"""
+                        def del1 = sql """select count(1) from 
${currentDb}.${table1} where tenant_id=1 and order_id=1001"""
+                        def del2 = sql """select count(1) from 
${currentDb}.${table1} where tenant_id=2 and order_id=2005"""
+                        def mapNew = sql """select count(1) from 
${currentDb}.${table2} where user_id=30 and role_id=300"""
+                        def mapDel = sql """select count(1) from 
${currentDb}.${table2} where user_id=10 and role_id=100"""
+                        def a1 = upd1.size() == 0 ? null : upd1.get(0).get(0)
+                        def a2 = upd2.size() == 0 ? null : upd2.get(0).get(0)
+                        log.info("incr count=" + cnt + " map.count=" + cntMap 
+ " upd1.amount=" + a1 + " upd2.amount=" + a2
+                                + " del1.exists=" + del1 + " del2.exists=" + 
del2
+                                + " mapNew=" + mapNew + " mapDel=" + mapDel)
+                        cnt.get(0).get(0) == 10 &&
+                                cntMap.get(0).get(0) == 6 &&
+                                a1 != null && a1.toString() == '999.99' &&
+                                a2 != null && a2.toString() == '888.88' &&
+                                del1.get(0).get(0) == 0 &&
+                                del2.get(0).get(0) == 0 &&
+                                mapNew.get(0).get(0) == 1 &&
+                                mapDel.get(0).get(0) == 0
+                    }
+            )
+        } catch (Exception ex) {
+            def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job (incr): " + showjob)
+            log.info("show task (incr): " + showtask)
+            throw ex
+        }
+
+        qt_select_after_incr """select tenant_id, order_id, order_no, amount 
from ${currentDb}.${table1} order by tenant_id, order_id;"""
+        qt_select_after_incr_map """select user_id, role_id from 
${currentDb}.${table2} order by user_id, role_id;"""
+
+        sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+        def jobCountRsp = sql """select count(1) from jobs("type"="insert") 
where Name ='${jobName}'"""
+        assert jobCountRsp.get(0).get(0) == 0
+    }
+}
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_datetime_pk.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_datetime_pk.groovy
new file mode 100644
index 00000000000..f37b60539d0
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_datetime_pk.groovy
@@ -0,0 +1,153 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_mysql_job_datetime_pk", 
"p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
+    def jobName = "test_streaming_mysql_job_datetime_pk_name"
+    def currentDb = (sql "select database()")[0][0]
+    def tableDt = "events_mysql_datetime_pk"
+    def tableComposite = "events_mysql_datetime_id_pk"
+    def mysqlDb = "test_cdc_db"
+
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    sql """drop table if exists ${currentDb}.${tableDt} force"""
+    sql """drop table if exists ${currentDb}.${tableComposite} force"""
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String mysql_port = context.config.otherConfigs.get("mysql_57_port");
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint = getS3Endpoint()
+        String bucket = getS3BucketName()
+        String driver_url = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar";
+
+        connect("root", "123456", 
"jdbc:mysql://${externalEnvIp}:${mysql_port}?serverTimezone=UTC") {
+            sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}"""
+            sql """DROP TABLE IF EXISTS ${mysqlDb}.${tableDt}"""
+            sql """CREATE TABLE ${mysqlDb}.${tableDt} (
+                  `event_ts` datetime(6) NOT NULL,
+                  `payload` varchar(64),
+                  PRIMARY KEY (`event_ts`)
+                ) ENGINE=InnoDB"""
+            sql """INSERT INTO ${mysqlDb}.${tableDt} VALUES ('2024-01-01 
00:00:00.000000', 'A1')"""
+            sql """INSERT INTO ${mysqlDb}.${tableDt} VALUES ('2024-06-15 
12:00:00.123456', 'B1')"""
+            sql """INSERT INTO ${mysqlDb}.${tableDt} VALUES ('2025-01-01 
00:00:00.000000', 'C1')"""
+            sql """INSERT INTO ${mysqlDb}.${tableDt} VALUES ('2025-06-15 
12:34:56.999999', 'D1')"""
+            sql """INSERT INTO ${mysqlDb}.${tableDt} VALUES ('2026-01-01 
00:00:00.000000', 'E1')"""
+
+            sql """DROP TABLE IF EXISTS ${mysqlDb}.${tableComposite}"""
+            sql """CREATE TABLE ${mysqlDb}.${tableComposite} (
+                  `event_ts` datetime(6) NOT NULL,
+                  `id` int NOT NULL,
+                  `payload` varchar(64),
+                  PRIMARY KEY (`event_ts`, `id`)
+                ) ENGINE=InnoDB"""
+            sql """INSERT INTO ${mysqlDb}.${tableComposite} VALUES 
('2024-02-01 00:00:00.000000', 1, 'A2')"""
+            sql """INSERT INTO ${mysqlDb}.${tableComposite} VALUES 
('2024-02-01 00:00:00.000000', 2, 'B2')"""
+            sql """INSERT INTO ${mysqlDb}.${tableComposite} VALUES 
('2024-02-02 12:00:00.500000', 3, 'C2')"""
+            sql """INSERT INTO ${mysqlDb}.${tableComposite} VALUES 
('2024-02-03 23:59:59.999999', 4, 'D2')"""
+            sql """INSERT INTO ${mysqlDb}.${tableComposite} VALUES 
('2024-02-04 00:00:00.000000', 5, 'E2')"""
+        }
+
+        sql """CREATE JOB ${jobName}
+                ON STREAMING
+                FROM MYSQL (
+                    "jdbc_url" = 
"jdbc:mysql://${externalEnvIp}:${mysql_port}?serverTimezone=UTC",
+                    "driver_url" = "${driver_url}",
+                    "driver_class" = "com.mysql.cj.jdbc.Driver",
+                    "user" = "root",
+                    "password" = "123456",
+                    "database" = "${mysqlDb}",
+                    "include_tables" = "${tableDt},${tableComposite}",
+                    "offset" = "initial",
+                    "snapshot_split_size" = "2"
+                )
+                TO DATABASE ${currentDb} (
+                  "table.create.properties.replication_num" = "1"
+                )
+            """
+
+        try {
+            Awaitility.await().atMost(300, SECONDS)
+                    .pollInterval(2, SECONDS).until(
+                    {
+                        def c1 = sql """select count(1) from 
${currentDb}.${tableDt}"""
+                        def c2 = sql """select count(1) from 
${currentDb}.${tableComposite}"""
+                        log.info("snapshot row count dt=${c1} composite=${c2}")
+                        c1.get(0).get(0) == 5 && c2.get(0).get(0) == 5
+                    }
+            )
+        } catch (Exception ex) {
+            def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job: " + showjob)
+            log.info("show task: " + showtask)
+            throw ex
+        }
+
+        qt_select_snapshot_datetime_pk """select event_ts, payload from 
${currentDb}.${tableDt} order by event_ts asc"""
+        qt_select_snapshot_composite_pk """select event_ts, id, payload from 
${currentDb}.${tableComposite} order by event_ts asc, id asc"""
+
+        connect("root", "123456", 
"jdbc:mysql://${externalEnvIp}:${mysql_port}?serverTimezone=UTC") {
+            sql """INSERT INTO ${mysqlDb}.${tableDt} VALUES ('2026-06-01 
00:00:00.000000', 'F2')"""
+            sql """UPDATE ${mysqlDb}.${tableDt} SET payload='B2_upd' WHERE 
event_ts='2024-06-15 12:00:00.123456'"""
+            sql """DELETE FROM ${mysqlDb}.${tableDt} WHERE 
event_ts='2025-06-15 12:34:56.999999'"""
+
+            sql """INSERT INTO ${mysqlDb}.${tableComposite} VALUES 
('2024-02-05 00:00:00.000000', 6, 'F3')"""
+            sql """UPDATE ${mysqlDb}.${tableComposite} SET payload='C3_upd' 
WHERE event_ts='2024-02-02 12:00:00.500000' AND id=3"""
+            sql """DELETE FROM ${mysqlDb}.${tableComposite} WHERE 
event_ts='2024-02-03 23:59:59.999999' AND id=4"""
+        }
+
+        try {
+            Awaitility.await().atMost(180, SECONDS)
+                    .pollInterval(2, SECONDS).until(
+                    {
+                        def c1 = sql """select count(1) from 
${currentDb}.${tableDt}"""
+                        def c2 = sql """select count(1) from 
${currentDb}.${tableComposite}"""
+                        def upd1 = sql """select payload from 
${currentDb}.${tableDt} where event_ts='2024-06-15 12:00:00.123456'"""
+                        def upd2 = sql """select payload from 
${currentDb}.${tableComposite} where event_ts='2024-02-02 12:00:00.500000' and 
id=3"""
+                        def del1 = sql """select count(1) from 
${currentDb}.${tableDt} where event_ts='2025-06-15 12:34:56.999999'"""
+                        def del2 = sql """select count(1) from 
${currentDb}.${tableComposite} where event_ts='2024-02-03 23:59:59.999999' and 
id=4"""
+                        def p1 = upd1.size() == 0 ? null : upd1.get(0).get(0)
+                        def p2 = upd2.size() == 0 ? null : upd2.get(0).get(0)
+                        log.info("incr dt=${c1} composite=${c2} dt_upd=${p1} 
comp_upd=${p2} dt_del=${del1} comp_del=${del2}")
+                        c1.get(0).get(0) == 5 && c2.get(0).get(0) == 5 &&
+                                p1 == 'B2_upd' && p2 == 'C3_upd' &&
+                                del1.get(0).get(0) == 0 && del2.get(0).get(0) 
== 0
+                    }
+            )
+        } catch (Exception ex) {
+            def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job (incr): " + showjob)
+            log.info("show task (incr): " + showtask)
+            throw ex
+        }
+
+        qt_select_after_incr_datetime_pk """select event_ts, payload from 
${currentDb}.${tableDt} order by event_ts asc"""
+        qt_select_after_incr_composite_pk """select event_ts, id, payload from 
${currentDb}.${tableComposite} order by event_ts asc, id asc"""
+
+        sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+        def jobCountRsp = sql """select count(1) from jobs("type"="insert") 
where Name ='${jobName}'"""
+        assert jobCountRsp.get(0).get(0) == 0
+    }
+}
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_decimal_pk.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_decimal_pk.groovy
new file mode 100644
index 00000000000..def9183809f
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_decimal_pk.groovy
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_mysql_job_decimal_pk", 
"p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
+    def jobName = "test_streaming_mysql_job_decimal_pk_name"
+    def currentDb = (sql "select database()")[0][0]
+    def tableDecimal = "streaming_decimal_pk_mysql"
+    def tableBigintUnsigned = "streaming_bigint_unsigned_pk_mysql"
+    def mysqlDb = "test_cdc_db"
+
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    sql """drop table if exists ${currentDb}.${tableDecimal} force"""
+    sql """drop table if exists ${currentDb}.${tableBigintUnsigned} force"""
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String mysql_port = context.config.otherConfigs.get("mysql_57_port");
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint = getS3Endpoint()
+        String bucket = getS3BucketName()
+        String driver_url = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar";
+
+        connect("root", "123456", 
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+            sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}"""
+            sql """DROP TABLE IF EXISTS ${mysqlDb}.${tableDecimal}"""
+            sql """CREATE TABLE ${mysqlDb}.${tableDecimal} (
+                  `amount` decimal(20,4) NOT NULL,
+                  `payload` varchar(64),
+                  PRIMARY KEY (`amount`)
+                ) ENGINE=InnoDB"""
+            sql """INSERT INTO ${mysqlDb}.${tableDecimal} VALUES (1.0000, 
'A1')"""
+            sql """INSERT INTO ${mysqlDb}.${tableDecimal} VALUES (2.5000, 
'B1')"""
+            sql """INSERT INTO ${mysqlDb}.${tableDecimal} VALUES (3.7500, 
'C1')"""
+            sql """INSERT INTO ${mysqlDb}.${tableDecimal} VALUES (4.1234, 
'D1')"""
+            sql """INSERT INTO ${mysqlDb}.${tableDecimal} VALUES 
(9999999999999.9999, 'E1')"""
+
+            sql """DROP TABLE IF EXISTS ${mysqlDb}.${tableBigintUnsigned}"""
+            sql """CREATE TABLE ${mysqlDb}.${tableBigintUnsigned} (
+                  `id` bigint unsigned NOT NULL,
+                  `payload` varchar(64),
+                  PRIMARY KEY (`id`)
+                ) ENGINE=InnoDB"""
+            sql """INSERT INTO ${mysqlDb}.${tableBigintUnsigned} VALUES (0, 
'zero')"""
+            sql """INSERT INTO ${mysqlDb}.${tableBigintUnsigned} VALUES (1, 
'one')"""
+            sql """INSERT INTO ${mysqlDb}.${tableBigintUnsigned} VALUES 
(9223372036854775807, 'signed_max')"""
+            sql """INSERT INTO ${mysqlDb}.${tableBigintUnsigned} VALUES 
(9223372036854775808, 'signed_max_plus1')"""
+            sql """INSERT INTO ${mysqlDb}.${tableBigintUnsigned} VALUES 
(18446744073709551615, 'unsigned_max')"""
+        }
+
+        sql """CREATE JOB ${jobName}
+                ON STREAMING
+                FROM MYSQL (
+                    "jdbc_url" = 
"jdbc:mysql://${externalEnvIp}:${mysql_port}?serverTimezone=UTC",
+                    "driver_url" = "${driver_url}",
+                    "driver_class" = "com.mysql.cj.jdbc.Driver",
+                    "user" = "root",
+                    "password" = "123456",
+                    "database" = "${mysqlDb}",
+                    "include_tables" = 
"${tableDecimal},${tableBigintUnsigned}",
+                    "offset" = "initial",
+                    "snapshot_split_size" = "2"
+                )
+                TO DATABASE ${currentDb} (
+                  "table.create.properties.replication_num" = "1"
+                )
+            """
+
+        try {
+            Awaitility.await().atMost(300, SECONDS)
+                    .pollInterval(2, SECONDS).until(
+                    {
+                        def c1 = sql """select count(1) from 
${currentDb}.${tableDecimal}"""
+                        def c2 = sql """select count(1) from 
${currentDb}.${tableBigintUnsigned}"""
+                        log.info("snapshot row count decimal=${c1} 
bigint_unsigned=${c2}")
+                        c1.get(0).get(0) == 5 && c2.get(0).get(0) == 5
+                    }
+            )
+        } catch (Exception ex) {
+            def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job: " + showjob)
+            log.info("show task: " + showtask)
+            throw ex
+        }
+
+        qt_select_snapshot_decimal """select amount, payload from 
${currentDb}.${tableDecimal} order by amount asc"""
+        qt_select_snapshot_bigint_unsigned """select id, payload from 
${currentDb}.${tableBigintUnsigned} order by id asc"""
+
+        connect("root", "123456", 
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+            sql """INSERT INTO ${mysqlDb}.${tableDecimal} VALUES (5.5555, 
'F2')"""
+            sql """UPDATE ${mysqlDb}.${tableDecimal} SET payload='B2_upd' 
WHERE amount=2.5000"""
+            sql """DELETE FROM ${mysqlDb}.${tableDecimal} WHERE 
amount=4.1234"""
+
+            sql """INSERT INTO ${mysqlDb}.${tableBigintUnsigned} VALUES 
(10000000000000000000, 'incr_huge')"""
+            sql """UPDATE ${mysqlDb}.${tableBigintUnsigned} SET 
payload='signed_max_upd' WHERE id=9223372036854775807"""
+            sql """DELETE FROM ${mysqlDb}.${tableBigintUnsigned} WHERE 
id=9223372036854775808"""
+        }
+
+        try {
+            Awaitility.await().atMost(180, SECONDS)
+                    .pollInterval(2, SECONDS).until(
+                    {
+                        def c1 = sql """select count(1) from 
${currentDb}.${tableDecimal}"""
+                        def c2 = sql """select count(1) from 
${currentDb}.${tableBigintUnsigned}"""
+                        def upd1 = sql """select payload from 
${currentDb}.${tableDecimal} where amount=2.5000"""
+                        def upd2 = sql """select payload from 
${currentDb}.${tableBigintUnsigned} where id=9223372036854775807"""
+                        def del1 = sql """select count(1) from 
${currentDb}.${tableDecimal} where amount=4.1234"""
+                        def del2 = sql """select count(1) from 
${currentDb}.${tableBigintUnsigned} where id=9223372036854775808"""
+                        def p1 = upd1.size() == 0 ? null : upd1.get(0).get(0)
+                        def p2 = upd2.size() == 0 ? null : upd2.get(0).get(0)
+                        log.info("incr decimal=${c1} bigint_unsigned=${c2} 
dec_upd=${p1} bui_upd=${p2} dec_del=${del1} bui_del=${del2}")
+                        c1.get(0).get(0) == 5 && c2.get(0).get(0) == 5 &&
+                                p1 == 'B2_upd' && p2 == 'signed_max_upd' &&
+                                del1.get(0).get(0) == 0 && del2.get(0).get(0) 
== 0
+                    }
+            )
+        } catch (Exception ex) {
+            def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job (incr): " + showjob)
+            log.info("show task (incr): " + showtask)
+            throw ex
+        }
+
+        qt_select_after_incr_decimal """select amount, payload from 
${currentDb}.${tableDecimal} order by amount asc"""
+        qt_select_after_incr_bigint_unsigned """select id, payload from 
${currentDb}.${tableBigintUnsigned} order by id asc"""
+
+        sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+        def jobCountRsp = sql """select count(1) from jobs("type"="insert") 
where Name ='${jobName}'"""
+        assert jobCountRsp.get(0).get(0) == 0
+    }
+}
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_id_gap_completeness.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_id_gap_completeness.groovy
new file mode 100644
index 00000000000..bfd15341095
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_id_gap_completeness.groovy
@@ -0,0 +1,159 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_mysql_job_id_gap_completeness", 
"p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
+    def jobName = "test_streaming_mysql_job_id_gap_completeness_name"
+    def currentDb = (sql "select database()")[0][0]
+    def table1 = "streaming_id_gap_mysql"
+    def mysqlDb = "test_cdc_db"
+
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    sql """drop table if exists ${currentDb}.${table1} force"""
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String mysql_port = context.config.otherConfigs.get("mysql_57_port");
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint = getS3Endpoint()
+        String bucket = getS3BucketName()
+        String driver_url = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar";
+
+        // ===== Prepare MySQL side =====
+        connect("root", "123456", 
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+            sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}"""
+            sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}"""
+            sql """CREATE TABLE ${mysqlDb}.${table1} (
+                  `id` bigint NOT NULL,
+                  `tag` varchar(64),
+                  `version` int,
+                  PRIMARY KEY (`id`)
+                ) ENGINE=InnoDB"""
+
+            // Dense rows id 1..100
+            StringBuilder sb = new StringBuilder()
+            sb.append("INSERT INTO ${mysqlDb}.${table1} (id, tag, version) 
VALUES ")
+            for (int i = 1; i <= 100; i++) {
+                if (i > 1) sb.append(", ")
+                sb.append("(${i}, 'dense', 0)")
+            }
+            sql sb.toString()
+
+            // Outlier id with a huge gap so it lands in the unbounded chunk.
+            sql """INSERT INTO ${mysqlDb}.${table1} (id, tag, version) VALUES 
(10000000, 'outlier', 0)"""
+
+            StringBuilder sb2 = new StringBuilder()
+            sb2.append("INSERT INTO ${mysqlDb}.${table1} (id, tag, version) 
VALUES ")
+            for (int i = 1; i <= 100; i++) {
+                if (i > 1) sb2.append(", ")
+                sb2.append("(${10000000 + i}, 'post_outlier', 0)")
+            }
+            sql sb2.toString()
+        }
+
+        // snapshot_split_size=20 -> 5 bounded chunks for id<=100 + 1 
unbounded chunk for the rest.
+        sql """CREATE JOB ${jobName}
+                ON STREAMING
+                FROM MYSQL (
+                    "jdbc_url" = 
"jdbc:mysql://${externalEnvIp}:${mysql_port}?serverTimezone=UTC",
+                    "driver_url" = "${driver_url}",
+                    "driver_class" = "com.mysql.cj.jdbc.Driver",
+                    "user" = "root",
+                    "password" = "123456",
+                    "database" = "${mysqlDb}",
+                    "include_tables" = "${table1}",
+                    "offset" = "initial",
+                    "snapshot_split_size" = "20"
+                )
+                TO DATABASE ${currentDb} (
+                  "table.create.properties.replication_num" = "1"
+                )
+            """
+
+        try {
+            Awaitility.await().atMost(300, SECONDS)
+                    .pollInterval(2, SECONDS).until(
+                    {
+                        def cnt = sql """select count(1) from 
${currentDb}.${table1}"""
+                        log.info("snapshot row count: " + cnt)
+                        cnt.get(0).get(0) == 201
+                    }
+            )
+        } catch (Exception ex) {
+            def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job: " + showjob)
+            log.info("show task: " + showtask)
+            throw ex
+        }
+
+        // All three id ranges must have synced fully: dense / outlier / 
post-outlier.
+        def distinctCount = sql """SELECT COUNT(DISTINCT id) FROM 
${currentDb}.${table1}"""
+        assert distinctCount.get(0).get(0) == 201
+        def outlierExists = sql """SELECT COUNT(1) FROM ${currentDb}.${table1} 
WHERE id=10000000"""
+        assert outlierExists.get(0).get(0) == 1
+        def postOutlierCount = sql """SELECT COUNT(1) FROM 
${currentDb}.${table1} WHERE id BETWEEN 10000001 AND 10000100"""
+        assert postOutlierCount.get(0).get(0) == 100
+
+        qt_select_snapshot_count """select count(1) from 
${currentDb}.${table1}"""
+        qt_select_snapshot_dense_sample """select id, tag from 
${currentDb}.${table1} where id in (1, 50, 100) order by id"""
+        qt_select_snapshot_outlier_sample """select id, tag from 
${currentDb}.${table1} where id in (10000000, 10000050, 10000100) order by id"""
+
+        // ===== Binlog phase: cover dense + outlier + post-outlier ranges 
=====
+        connect("root", "123456", 
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+            sql """INSERT INTO ${mysqlDb}.${table1} (id, tag, version) VALUES 
(10000200, 'incr_outlier', 1)"""
+            sql """UPDATE ${mysqlDb}.${table1} SET version=99 WHERE id=50"""
+            sql """DELETE FROM ${mysqlDb}.${table1} WHERE id=10000000"""
+        }
+
+        try {
+            Awaitility.await().atMost(180, SECONDS)
+                    .pollInterval(2, SECONDS).until(
+                    {
+                        def cnt = sql """select count(1) from 
${currentDb}.${table1}"""
+                        def upd = sql """select version from 
${currentDb}.${table1} where id=50"""
+                        def del = sql """select count(1) from 
${currentDb}.${table1} where id=10000000"""
+                        def ins = sql """select count(1) from 
${currentDb}.${table1} where id=10000200"""
+                        def v = upd.size() == 0 ? null : upd.get(0).get(0)
+                        log.info("incr cnt=${cnt} id50.version=${v} 
id10000000.exists=${del} id10000200.exists=${ins}")
+                        cnt.get(0).get(0) == 201 &&
+                                v != null && v.toString() == '99' &&
+                                del.get(0).get(0) == 0 &&
+                                ins.get(0).get(0) == 1
+                    }
+            )
+        } catch (Exception ex) {
+            def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job (incr): " + showjob)
+            log.info("show task (incr): " + showtask)
+            throw ex
+        }
+
+        qt_select_after_incr_count """select count(1) from 
${currentDb}.${table1}"""
+        qt_select_after_incr_changed """select id, tag, version from 
${currentDb}.${table1} where id in (50, 10000200) order by id"""
+
+        sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+        def jobCountRsp = sql """select count(1) from jobs("type"="insert") 
where Name ='${jobName}'"""
+        assert jobCountRsp.get(0).get(0) == 0
+    }
+}
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_snapshot_with_concurrent_dml.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_snapshot_with_concurrent_dml.groovy
new file mode 100644
index 00000000000..597bea37320
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_snapshot_with_concurrent_dml.groovy
@@ -0,0 +1,148 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_mysql_job_snapshot_with_concurrent_dml", 
"p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
+    def jobName = "test_streaming_mysql_job_snapshot_with_concurrent_dml_name"
+    def currentDb = (sql "select database()")[0][0]
+    def table1 = "streaming_snapshot_dml_mysql"
+    def unrelated = "streaming_snapshot_dml_unrelated_mysql"
+    def mysqlDb = "test_cdc_db"
+    def totalRows = 1000
+
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    sql """drop table if exists ${currentDb}.${table1} force"""
+    sql """drop table if exists ${currentDb}.${unrelated} force"""
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String mysql_port = context.config.otherConfigs.get("mysql_57_port");
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint = getS3Endpoint()
+        String bucket = getS3BucketName()
+        String driver_url = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar";
+
+        // ===== Prepare MySQL side =====
+        connect("root", "123456", 
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+            sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}"""
+            sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}"""
+            sql """CREATE TABLE ${mysqlDb}.${table1} (
+                  `id` int NOT NULL,
+                  `tag` varchar(64),
+                  `version` int,
+                  PRIMARY KEY (`id`)
+                ) ENGINE=InnoDB"""
+
+            StringBuilder sb = new StringBuilder()
+            sb.append("INSERT INTO ${mysqlDb}.${table1} (id, tag, version) 
VALUES ")
+            for (int i = 1; i <= totalRows; i++) {
+                if (i > 1) sb.append(", ")
+                sb.append("(${i}, 'snap', 0)")
+            }
+            sql sb.toString()
+
+            sql """DROP TABLE IF EXISTS ${mysqlDb}.${unrelated}"""
+            sql """CREATE TABLE ${mysqlDb}.${unrelated} (
+                  `id` int NOT NULL,
+                  `tag` varchar(64),
+                  PRIMARY KEY (`id`)
+                ) ENGINE=InnoDB"""
+            sql """INSERT INTO ${mysqlDb}.${unrelated} (id, tag) VALUES (1, 
'pre_snap')"""
+        }
+
+        // snapshot_split_size=10 + snapshot_parallelism=1 -> 100 serial 
splits, slow enough that
+        // the concurrent DML below actually overlaps with snapshot.
+        sql """CREATE JOB ${jobName}
+                ON STREAMING
+                FROM MYSQL (
+                    "jdbc_url" = 
"jdbc:mysql://${externalEnvIp}:${mysql_port}?serverTimezone=UTC",
+                    "driver_url" = "${driver_url}",
+                    "driver_class" = "com.mysql.cj.jdbc.Driver",
+                    "user" = "root",
+                    "password" = "123456",
+                    "database" = "${mysqlDb}",
+                    "include_tables" = "${table1}",
+                    "offset" = "initial",
+                    "snapshot_split_size" = "10",
+                    "snapshot_parallelism" = "1"
+                )
+                TO DATABASE ${currentDb} (
+                  "table.create.properties.replication_num" = "1"
+                )
+            """
+
+        // Concurrent DML on source while cdc-client is still snapshotting.
+        connect("root", "123456", 
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+            for (int i = 1; i <= 10; i++) {
+                sql """INSERT INTO ${mysqlDb}.${table1} (id, tag, version) 
VALUES (${totalRows + i}, 'concurrent_ins', 1)"""
+            }
+            sql """UPDATE ${mysqlDb}.${table1} SET version=99 WHERE id IN (1, 
100, 500, 999)"""
+            sql """DELETE FROM ${mysqlDb}.${table1} WHERE id IN (2, 200, 
800)"""
+
+            // DML on unrelated table - must NOT leak into Doris (not in 
include_tables).
+            sql """INSERT INTO ${mysqlDb}.${unrelated} (id, tag) VALUES (2, 
'concurrent_unrelated_ins')"""
+            sql """UPDATE ${mysqlDb}.${unrelated} SET 
tag='concurrent_unrelated_upd' WHERE id=1"""
+        }
+
+        def expectedRows = totalRows + 10 - 3
+        try {
+            Awaitility.await().atMost(600, SECONDS)
+                    .pollInterval(2, SECONDS).until(
+                    {
+                        def cnt = sql """select count(1) from 
${currentDb}.${table1}"""
+                        def upd1 = sql """select version from 
${currentDb}.${table1} where id=1"""
+                        def upd999 = sql """select version from 
${currentDb}.${table1} where id=999"""
+                        def del2 = sql """select count(1) from 
${currentDb}.${table1} where id=2"""
+                        def del800 = sql """select count(1) from 
${currentDb}.${table1} where id=800"""
+                        def ins1010 = sql """select count(1) from 
${currentDb}.${table1} where id=${totalRows + 10}"""
+                        def v1 = upd1.size() == 0 ? null : upd1.get(0).get(0)
+                        def v999 = upd999.size() == 0 ? null : 
upd999.get(0).get(0)
+                        log.info("incr cnt=${cnt} v1=${v1} v999=${v999} 
del2=${del2} del800=${del800} ins1010=${ins1010}")
+                        cnt.get(0).get(0) == expectedRows &&
+                                v1 != null && v1.toString() == '99' &&
+                                v999 != null && v999.toString() == '99' &&
+                                del2.get(0).get(0) == 0 &&
+                                del800.get(0).get(0) == 0 &&
+                                ins1010.get(0).get(0) == 1
+                    }
+            )
+        } catch (Exception ex) {
+            def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job: " + showjob)
+            log.info("show task: " + showtask)
+            throw ex
+        }
+
+        def showUnrelated = sql """show tables from ${currentDb} like 
'${unrelated}'"""
+        assert showUnrelated.size() == 0
+
+        qt_select_count """select count(1) from ${currentDb}.${table1}"""
+        qt_select_updates """select id, version from ${currentDb}.${table1} 
where id in (1, 100, 500, 999) order by id"""
+        qt_select_deletes """select count(1) from ${currentDb}.${table1} where 
id in (2, 200, 800)"""
+        qt_select_inserts """select id, tag, version from 
${currentDb}.${table1} where id > ${totalRows} order by id"""
+
+        sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+        def jobCountRsp = sql """select count(1) from jobs("type"="insert") 
where Name ='${jobName}'"""
+        assert jobCountRsp.get(0).get(0) == 0
+    }
+}
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_async_split_pause_resume.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_async_split_pause_resume.groovy
index c367f4646e2..b367e8fefb7 100644
--- 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_async_split_pause_resume.groovy
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_async_split_pause_resume.groovy
@@ -114,19 +114,45 @@ 
suite("test_streaming_postgres_job_async_split_pause_resume",
             throw ex
         }
 
-        // Capture state, sleep, recapture — succeedTaskCount must not grow 
while paused.
-        def succeedAtPause = sql("""select SucceedTaskCount from 
jobs("type"="insert") where Name='${jobName}'""")
-                .get(0).get(0).toString()
-        def rowsAtPause = sql("""SELECT COUNT(*) FROM 
${currentDb}.${table1}""").get(0).get(0) as int
-        sleep(15000)
+        // PAUSE is best-effort on the FE side: it stops new chunk dispatch, 
but any chunk
+        // already in flight on cdc_client keeps running until its stream load 
commits. Wait
+        // for both SucceedTaskCount and row count to stay flat across a 
window that is wider
+        // than the post-pause observation sleep below — two equal samples a 
few seconds apart
+        // are not a strong enough signal: a slow chunk could commit after the 
would-be settle
+        // and trip the row-growth assertion in the next step.
+        long lastRows = -1L
+        String lastSucceed = ""
+        int stableCount = 0
+        final int requiredStable = 4
+        Awaitility.await().atMost(120, SECONDS).pollInterval(4, 
SECONDS).until({
+            long curRows = sql("""SELECT COUNT(*) FROM 
${currentDb}.${table1}""").get(0).get(0) as long
+            String curSucceed = sql("""select SucceedTaskCount from 
jobs("type"="insert") where Name='${jobName}'""")
+                    .get(0).get(0).toString()
+            if (curRows == lastRows && curSucceed == lastSucceed) {
+                stableCount++
+            } else {
+                stableCount = 1
+                lastRows = curRows
+                lastSucceed = curSucceed
+            }
+            log.info("pause-settle: rows=${curRows} succeed=${curSucceed} 
stable=${stableCount}/${requiredStable}")
+            stableCount >= requiredStable
+        })
+
+        // Capture state, sleep, recapture — succeedTaskCount must not grow 
while paused,
+        // and row count must not grow once the in-flight chunk has settled. 
The sleep below
+        // is intentionally shorter than the stability window above.
+        def succeedAtPause = lastSucceed
+        def rowsAtPause = lastRows
+        sleep(10000)
         def succeedAfterSleep = sql("""select SucceedTaskCount from 
jobs("type"="insert") where Name='${jobName}'""")
                 .get(0).get(0).toString()
-        def rowsAfterSleep = sql("""SELECT COUNT(*) FROM 
${currentDb}.${table1}""").get(0).get(0) as int
+        def rowsAfterSleep = sql("""SELECT COUNT(*) FROM 
${currentDb}.${table1}""").get(0).get(0) as long
         log.info("paused: succeed ${succeedAtPause}->${succeedAfterSleep} rows 
${rowsAtPause}->${rowsAfterSleep}")
         assert succeedAfterSleep == succeedAtPause :
                 "SucceedTaskCount grew while paused (${succeedAtPause} -> 
${succeedAfterSleep}) — splitter not stopped"
         assert rowsAfterSleep == rowsAtPause :
-                "row count grew while paused (${rowsAtPause} -> 
${rowsAfterSleep}) — tasks still running"
+                "row count grew after PAUSE settled (${rowsAtPause} -> 
${rowsAfterSleep}) — new tasks dispatched while paused"
 
         def pausedStatus = sql """select status from jobs("type"="insert") 
where Name='${jobName}'"""
         assert pausedStatus.get(0).get(0) == "PAUSED" : "job didn't stay 
PAUSED"
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_composite_pk.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_composite_pk.groovy
new file mode 100644
index 00000000000..80e580e91b8
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_composite_pk.groovy
@@ -0,0 +1,198 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_postgres_job_composite_pk", 
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+    def jobName = "test_streaming_postgres_job_composite_pk_name"
+    def currentDb = (sql "select database()")[0][0]
+    def table1 = "streaming_composite_pk_pg"
+    def table2 = "streaming_full_pk_map_pg"
+    def pgDB = "postgres"
+    def pgSchema = "cdc_test"
+    def pgUser = "postgres"
+    def pgPassword = "123456"
+
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    sql """drop table if exists ${currentDb}.${table1} force"""
+    sql """drop table if exists ${currentDb}.${table2} force"""
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String pg_port = context.config.otherConfigs.get("pg_14_port");
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint = getS3Endpoint()
+        String bucket = getS3BucketName()
+        String driver_url = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar";
+
+        // ===== Prepare PG side =====
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}"""
+            sql """
+            create table ${pgDB}.${pgSchema}.${table1} (
+                org_id     integer NOT NULL,
+                item_id    bigint  NOT NULL,
+                item_no    varchar(64),
+                qty        integer,
+                PRIMARY KEY (org_id, item_id)
+            );
+            """
+
+            // Snapshot rows: 2 orgs x 5 items each.
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (1, 1001, 
'A_001', 10)"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (1, 1002, 
'A_002', 20)"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (1, 1003, 
'A_003', 30)"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (1, 1004, 
'A_004', 40)"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (1, 1005, 
'A_005', 50)"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (2, 2001, 
'B_001', 100)"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (2, 2002, 
'B_002', 200)"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (2, 2003, 
'B_003', 300)"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (2, 2004, 
'B_004', 400)"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (2, 2005, 
'B_005', 500)"""
+
+            sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table2}"""
+            sql """
+            create table ${pgDB}.${pgSchema}.${table2} (
+                user_id integer NOT NULL,
+                role_id integer NOT NULL,
+                PRIMARY KEY (user_id, role_id)
+            );
+            """
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table2} VALUES (10, 
100)"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table2} VALUES (10, 
101)"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table2} VALUES (10, 
102)"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table2} VALUES (20, 
200)"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table2} VALUES (20, 
201)"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table2} VALUES (20, 
202)"""
+        }
+
+        // snapshot_split_size=3 -> chunks cross the org boundary.
+        sql """CREATE JOB ${jobName}
+                ON STREAMING
+                FROM POSTGRES (
+                    "jdbc_url" = 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+                    "driver_url" = "${driver_url}",
+                    "driver_class" = "org.postgresql.Driver",
+                    "user" = "${pgUser}",
+                    "password" = "${pgPassword}",
+                    "database" = "${pgDB}",
+                    "schema" = "${pgSchema}",
+                    "include_tables" = "${table1},${table2}",
+                    "offset" = "initial",
+                    "snapshot_split_size" = "3"
+                )
+                TO DATABASE ${currentDb} (
+                  "table.create.properties.replication_num" = "1"
+                )
+            """
+
+        try {
+            Awaitility.await().atMost(300, SECONDS)
+                    .pollInterval(2, SECONDS).until(
+                    {
+                        def cnt1 = sql """select count(1) from 
${currentDb}.${table1}"""
+                        def cnt2 = sql """select count(1) from 
${currentDb}.${table2}"""
+                        log.info("snapshot row count table1=${cnt1} 
table2=${cnt2}")
+                        cnt1.get(0).get(0) == 10 && cnt2.get(0).get(0) == 6
+                    }
+            )
+        } catch (Exception ex) {
+            def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job: " + showjob)
+            log.info("show task: " + showtask)
+            throw ex
+        }
+
+        def showTbl = sql """show create table ${currentDb}.${table1}"""
+        def createInfo = showTbl[0][1]
+        log.info("create table: " + createInfo)
+        assert createInfo.contains("UNIQUE KEY(`org_id`, `item_id`)")
+
+        def showTbl2 = sql """show create table ${currentDb}.${table2}"""
+        def createInfo2 = showTbl2[0][1]
+        log.info("create table2: " + createInfo2)
+        assert createInfo2.contains("UNIQUE KEY(`user_id`, `role_id`)")
+
+        qt_desc_composite_pk """desc ${currentDb}.${table1};"""
+        qt_select_snapshot """select org_id, item_id, item_no, qty from 
${currentDb}.${table1} order by org_id, item_id;"""
+        qt_select_snapshot_map """select user_id, role_id from 
${currentDb}.${table2} order by user_id, role_id;"""
+
+        // ===== Binlog phase: INSERT / UPDATE / DELETE that all require 
composite-PK locating =====
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (1, 1006, 
'A_006', 60)"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (2, 2006, 
'B_006', 600)"""
+
+            sql """UPDATE ${pgDB}.${pgSchema}.${table1} SET qty=999 WHERE 
org_id=1 AND item_id=1003"""
+            sql """UPDATE ${pgDB}.${pgSchema}.${table1} SET qty=888 WHERE 
org_id=2 AND item_id=2002"""
+
+            sql """DELETE FROM ${pgDB}.${pgSchema}.${table1} WHERE org_id=1 
AND item_id=1001"""
+            sql """DELETE FROM ${pgDB}.${pgSchema}.${table1} WHERE org_id=2 
AND item_id=2005"""
+
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table2} VALUES (10, 
103)"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table2} VALUES (30, 
300)"""
+            sql """DELETE FROM ${pgDB}.${pgSchema}.${table2} WHERE user_id=10 
AND role_id=100"""
+            sql """DELETE FROM ${pgDB}.${pgSchema}.${table2} WHERE user_id=20 
AND role_id=202"""
+        }
+
+        try {
+            Awaitility.await().atMost(180, SECONDS)
+                    .pollInterval(2, SECONDS).until(
+                    {
+                        def cnt = sql """select count(1) from 
${currentDb}.${table1}"""
+                        def cntMap = sql """select count(1) from 
${currentDb}.${table2}"""
+                        def upd1 = sql """select qty from 
${currentDb}.${table1} where org_id=1 and item_id=1003"""
+                        def upd2 = sql """select qty from 
${currentDb}.${table1} where org_id=2 and item_id=2002"""
+                        def del1 = sql """select count(1) from 
${currentDb}.${table1} where org_id=1 and item_id=1001"""
+                        def del2 = sql """select count(1) from 
${currentDb}.${table1} where org_id=2 and item_id=2005"""
+                        def mapNew = sql """select count(1) from 
${currentDb}.${table2} where user_id=30 and role_id=300"""
+                        def mapDel = sql """select count(1) from 
${currentDb}.${table2} where user_id=10 and role_id=100"""
+                        def q1 = upd1.size() == 0 ? null : upd1.get(0).get(0)
+                        def q2 = upd2.size() == 0 ? null : upd2.get(0).get(0)
+                        log.info("incr count=" + cnt + " map.count=" + cntMap 
+ " upd1.qty=" + q1 + " upd2.qty=" + q2
+                                + " del1.exists=" + del1 + " del2.exists=" + 
del2
+                                + " mapNew=" + mapNew + " mapDel=" + mapDel)
+                        cnt.get(0).get(0) == 10 &&
+                                cntMap.get(0).get(0) == 6 &&
+                                q1 != null && q1.toString() == '999' &&
+                                q2 != null && q2.toString() == '888' &&
+                                del1.get(0).get(0) == 0 &&
+                                del2.get(0).get(0) == 0 &&
+                                mapNew.get(0).get(0) == 1 &&
+                                mapDel.get(0).get(0) == 0
+                    }
+            )
+        } catch (Exception ex) {
+            def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job (incr): " + showjob)
+            log.info("show task (incr): " + showtask)
+            throw ex
+        }
+
+        qt_select_after_incr """select org_id, item_id, item_no, qty from 
${currentDb}.${table1} order by org_id, item_id;"""
+        qt_select_after_incr_map """select user_id, role_id from 
${currentDb}.${table2} order by user_id, role_id;"""
+
+        sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+        def jobCountRsp = sql """select count(1) from jobs("type"="insert") 
where Name ='${jobName}'"""
+        assert jobCountRsp.get(0).get(0) == 0
+    }
+}
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_decimal_pk.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_decimal_pk.groovy
new file mode 100644
index 00000000000..17ef08a469d
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_decimal_pk.groovy
@@ -0,0 +1,130 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_postgres_job_decimal_pk", 
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+    def jobName = "test_streaming_postgres_job_decimal_pk_name"
+    def currentDb = (sql "select database()")[0][0]
+    def table1 = "streaming_decimal_pk_pg"
+    def pgDB = "postgres"
+    def pgSchema = "cdc_test"
+    def pgUser = "postgres"
+    def pgPassword = "123456"
+
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    sql """drop table if exists ${currentDb}.${table1} force"""
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String pg_port = context.config.otherConfigs.get("pg_14_port");
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint = getS3Endpoint()
+        String bucket = getS3BucketName()
+        String driver_url = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar";
+
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}"""
+            sql """
+            create table ${pgDB}.${pgSchema}.${table1} (
+                amount  numeric(20, 4) NOT NULL,
+                payload varchar(64),
+                PRIMARY KEY (amount)
+            );
+            """
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (1.0000, 
'A1')"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (2.5000, 
'B1')"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (3.7500, 
'C1')"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (4.1234, 
'D1')"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES 
(9999999999999.9999, 'E1')"""
+        }
+
+        sql """CREATE JOB ${jobName}
+                ON STREAMING
+                FROM POSTGRES (
+                    "jdbc_url" = 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+                    "driver_url" = "${driver_url}",
+                    "driver_class" = "org.postgresql.Driver",
+                    "user" = "${pgUser}",
+                    "password" = "${pgPassword}",
+                    "database" = "${pgDB}",
+                    "schema" = "${pgSchema}",
+                    "include_tables" = "${table1}",
+                    "offset" = "initial",
+                    "snapshot_split_size" = "2"
+                )
+                TO DATABASE ${currentDb} (
+                  "table.create.properties.replication_num" = "1"
+                )
+            """
+
+        try {
+            Awaitility.await().atMost(300, SECONDS)
+                    .pollInterval(2, SECONDS).until(
+                    {
+                        def cnt = sql """select count(1) from 
${currentDb}.${table1}"""
+                        log.info("snapshot row count: " + cnt)
+                        cnt.get(0).get(0) == 5
+                    }
+            )
+        } catch (Exception ex) {
+            def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job: " + showjob)
+            log.info("show task: " + showtask)
+            throw ex
+        }
+
+        qt_select_snapshot """select amount, payload from 
${currentDb}.${table1} order by amount asc"""
+
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (5.5555, 
'F2')"""
+            sql """UPDATE ${pgDB}.${pgSchema}.${table1} SET payload='B2_upd' 
WHERE amount=2.5000"""
+            sql """DELETE FROM ${pgDB}.${pgSchema}.${table1} WHERE 
amount=4.1234"""
+        }
+
+        try {
+            Awaitility.await().atMost(180, SECONDS)
+                    .pollInterval(2, SECONDS).until(
+                    {
+                        def cnt = sql """select count(1) from 
${currentDb}.${table1}"""
+                        def upd = sql """select payload from 
${currentDb}.${table1} where amount=2.5000"""
+                        def del = sql """select count(1) from 
${currentDb}.${table1} where amount=4.1234"""
+                        def p = upd.size() == 0 ? null : upd.get(0).get(0)
+                        log.info("incr cnt=${cnt} upd.payload=${p} 
del.exists=${del}")
+                        cnt.get(0).get(0) == 5 && p == 'B2_upd' && 
del.get(0).get(0) == 0
+                    }
+            )
+        } catch (Exception ex) {
+            def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job (incr): " + showjob)
+            log.info("show task (incr): " + showtask)
+            throw ex
+        }
+
+        qt_select_after_incr """select amount, payload from 
${currentDb}.${table1} order by amount asc"""
+
+        sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+        def jobCountRsp = sql """select count(1) from jobs("type"="insert") 
where Name ='${jobName}'"""
+        assert jobCountRsp.get(0).get(0) == 0
+    }
+}
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_id_gap_completeness.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_id_gap_completeness.groovy
new file mode 100644
index 00000000000..c4c913100b9
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_id_gap_completeness.groovy
@@ -0,0 +1,146 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_postgres_job_id_gap_completeness", 
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+    def jobName = "test_streaming_postgres_job_id_gap_completeness_name"
+    def currentDb = (sql "select database()")[0][0]
+    def table1 = "streaming_id_gap_pg"
+    def pgDB = "postgres"
+    def pgSchema = "cdc_test"
+    def pgUser = "postgres"
+    def pgPassword = "123456"
+
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    sql """drop table if exists ${currentDb}.${table1} force"""
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String pg_port = context.config.otherConfigs.get("pg_14_port");
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint = getS3Endpoint()
+        String bucket = getS3BucketName()
+        String driver_url = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar";
+
+        // ===== Prepare PG side =====
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}"""
+            sql """
+            create table ${pgDB}.${pgSchema}.${table1} (
+                id      bigint PRIMARY KEY,
+                tag     varchar(64),
+                version integer
+            );
+            """
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (id, tag, version)
+                   SELECT g, 'dense', 0 FROM generate_series(1, 100) g"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (id, tag, 
version) VALUES (10000000, 'outlier', 0)"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (id, tag, version)
+                   SELECT g, 'post_outlier', 0 FROM generate_series(10000001, 
10000100) g"""
+        }
+
+        sql """CREATE JOB ${jobName}
+                ON STREAMING
+                FROM POSTGRES (
+                    "jdbc_url" = 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+                    "driver_url" = "${driver_url}",
+                    "driver_class" = "org.postgresql.Driver",
+                    "user" = "${pgUser}",
+                    "password" = "${pgPassword}",
+                    "database" = "${pgDB}",
+                    "schema" = "${pgSchema}",
+                    "include_tables" = "${table1}",
+                    "offset" = "initial",
+                    "snapshot_split_size" = "20"
+                )
+                TO DATABASE ${currentDb} (
+                  "table.create.properties.replication_num" = "1"
+                )
+            """
+
+        try {
+            Awaitility.await().atMost(300, SECONDS)
+                    .pollInterval(2, SECONDS).until(
+                    {
+                        def cnt = sql """select count(1) from 
${currentDb}.${table1}"""
+                        log.info("snapshot row count: " + cnt)
+                        cnt.get(0).get(0) == 201
+                    }
+            )
+        } catch (Exception ex) {
+            def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job: " + showjob)
+            log.info("show task: " + showtask)
+            throw ex
+        }
+
+        def distinctCount = sql """SELECT COUNT(DISTINCT id) FROM 
${currentDb}.${table1}"""
+        assert distinctCount.get(0).get(0) == 201
+        def outlierExists = sql """SELECT COUNT(1) FROM ${currentDb}.${table1} 
WHERE id=10000000"""
+        assert outlierExists.get(0).get(0) == 1
+        def postOutlierCount = sql """SELECT COUNT(1) FROM 
${currentDb}.${table1} WHERE id BETWEEN 10000001 AND 10000100"""
+        assert postOutlierCount.get(0).get(0) == 100
+
+        qt_select_snapshot_count """select count(1) from 
${currentDb}.${table1}"""
+        qt_select_snapshot_dense_sample """select id, tag from 
${currentDb}.${table1} where id in (1, 50, 100) order by id"""
+        qt_select_snapshot_outlier_sample """select id, tag from 
${currentDb}.${table1} where id in (10000000, 10000050, 10000100) order by id"""
+
+        // ===== Binlog phase: cover dense + outlier + post-outlier ranges 
=====
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (id, tag, 
version) VALUES (10000200, 'incr_outlier', 1)"""
+            sql """UPDATE ${pgDB}.${pgSchema}.${table1} SET version=99 WHERE 
id=50"""
+            sql """DELETE FROM ${pgDB}.${pgSchema}.${table1} WHERE 
id=10000000"""
+        }
+
+        try {
+            Awaitility.await().atMost(180, SECONDS)
+                    .pollInterval(2, SECONDS).until(
+                    {
+                        def cnt = sql """select count(1) from 
${currentDb}.${table1}"""
+                        def upd = sql """select version from 
${currentDb}.${table1} where id=50"""
+                        def del = sql """select count(1) from 
${currentDb}.${table1} where id=10000000"""
+                        def ins = sql """select count(1) from 
${currentDb}.${table1} where id=10000200"""
+                        def v = upd.size() == 0 ? null : upd.get(0).get(0)
+                        log.info("incr cnt=${cnt} id50.version=${v} 
id10000000.exists=${del} id10000200.exists=${ins}")
+                        cnt.get(0).get(0) == 201 &&
+                                v != null && v.toString() == '99' &&
+                                del.get(0).get(0) == 0 &&
+                                ins.get(0).get(0) == 1
+                    }
+            )
+        } catch (Exception ex) {
+            def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job (incr): " + showjob)
+            log.info("show task (incr): " + showtask)
+            throw ex
+        }
+
+        qt_select_after_incr_count """select count(1) from 
${currentDb}.${table1}"""
+        qt_select_after_incr_changed """select id, tag, version from 
${currentDb}.${table1} where id in (50, 10000200) order by id"""
+
+        sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+        def jobCountRsp = sql """select count(1) from jobs("type"="insert") 
where Name ='${jobName}'"""
+        assert jobCountRsp.get(0).get(0) == 0
+    }
+}
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_with_concurrent_dml.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_with_concurrent_dml.groovy
new file mode 100644
index 00000000000..1694862248a
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_with_concurrent_dml.groovy
@@ -0,0 +1,153 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_postgres_job_snapshot_with_concurrent_dml", 
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+    def jobName = 
"test_streaming_postgres_job_snapshot_with_concurrent_dml_name"
+    def currentDb = (sql "select database()")[0][0]
+    def table1 = "streaming_snapshot_dml_pg"
+    def unrelated = "streaming_snapshot_dml_unrelated_pg"
+    def pgDB = "postgres"
+    def pgSchema = "cdc_test"
+    def pgUser = "postgres"
+    def pgPassword = "123456"
+    def totalRows = 1000
+
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    sql """drop table if exists ${currentDb}.${table1} force"""
+    sql """drop table if exists ${currentDb}.${unrelated} force"""
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String pg_port = context.config.otherConfigs.get("pg_14_port");
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint = getS3Endpoint()
+        String bucket = getS3BucketName()
+        String driver_url = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar";
+
+        // ===== Prepare PG side =====
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}"""
+            sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${unrelated}"""
+            sql """
+            create table ${pgDB}.${pgSchema}.${table1} (
+                id      integer PRIMARY KEY,
+                tag     varchar(64),
+                version integer
+            );
+            """
+            // 1000 snapshot rows via generate_series.
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (id, tag, version)
+                   SELECT g, 'snap', 0 FROM generate_series(1, ${totalRows}) 
g"""
+
+            sql """
+            create table ${pgDB}.${pgSchema}.${unrelated} (
+                id      integer PRIMARY KEY,
+                tag     varchar(64)
+            );
+            """
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${unrelated} (id, tag) 
VALUES (1, 'pre_snap')"""
+        }
+
+        // snapshot_split_size=10 + snapshot_parallelism=1 -> 100 serial 
splits, slow enough that
+        // the concurrent DML below actually overlaps with snapshot.
+        sql """CREATE JOB ${jobName}
+                ON STREAMING
+                FROM POSTGRES (
+                    "jdbc_url" = 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+                    "driver_url" = "${driver_url}",
+                    "driver_class" = "org.postgresql.Driver",
+                    "user" = "${pgUser}",
+                    "password" = "${pgPassword}",
+                    "database" = "${pgDB}",
+                    "schema" = "${pgSchema}",
+                    "include_tables" = "${table1}",
+                    "offset" = "initial",
+                    "snapshot_split_size" = "10",
+                    "snapshot_parallelism" = "1"
+                )
+                TO DATABASE ${currentDb} (
+                  "table.create.properties.replication_num" = "1"
+                )
+            """
+
+        // Concurrent DML on source while cdc-client is still snapshotting.
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            // 10 INSERTs with id > snapshot range
+            for (int i = 1; i <= 10; i++) {
+                sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (id, tag, 
version) VALUES (${totalRows + i}, 'concurrent_ins', 1)"""
+            }
+            // 4 UPDATEs spanning the chunk range
+            sql """UPDATE ${pgDB}.${pgSchema}.${table1} SET version=99 WHERE 
id IN (1, 100, 500, 999)"""
+            // 3 DELETEs spanning the chunk range
+            sql """DELETE FROM ${pgDB}.${pgSchema}.${table1} WHERE id IN (2, 
200, 800)"""
+
+            // DML on unrelated table - must NOT leak into Doris (not in 
include_tables).
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${unrelated} (id, tag) 
VALUES (2, 'concurrent_unrelated_ins')"""
+            sql """UPDATE ${pgDB}.${pgSchema}.${unrelated} SET 
tag='concurrent_unrelated_upd' WHERE id=1"""
+        }
+
+        // Wait until final state matches source: 1000 + 10 inserts - 3 
deletes = 1007 rows.
+        def expectedRows = totalRows + 10 - 3
+        try {
+            Awaitility.await().atMost(600, SECONDS)
+                    .pollInterval(2, SECONDS).until(
+                    {
+                        def cnt = sql """select count(1) from 
${currentDb}.${table1}"""
+                        def upd1 = sql """select version from 
${currentDb}.${table1} where id=1"""
+                        def upd999 = sql """select version from 
${currentDb}.${table1} where id=999"""
+                        def del2 = sql """select count(1) from 
${currentDb}.${table1} where id=2"""
+                        def del800 = sql """select count(1) from 
${currentDb}.${table1} where id=800"""
+                        def ins1010 = sql """select count(1) from 
${currentDb}.${table1} where id=${totalRows + 10}"""
+                        def v1 = upd1.size() == 0 ? null : upd1.get(0).get(0)
+                        def v999 = upd999.size() == 0 ? null : 
upd999.get(0).get(0)
+                        log.info("incr cnt=${cnt} v1=${v1} v999=${v999} 
del2=${del2} del800=${del800} ins1010=${ins1010}")
+                        cnt.get(0).get(0) == expectedRows &&
+                                v1 != null && v1.toString() == '99' &&
+                                v999 != null && v999.toString() == '99' &&
+                                del2.get(0).get(0) == 0 &&
+                                del800.get(0).get(0) == 0 &&
+                                ins1010.get(0).get(0) == 1
+                    }
+            )
+        } catch (Exception ex) {
+            def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job: " + showjob)
+            log.info("show task: " + showtask)
+            throw ex
+        }
+
+        // unrelated table must NOT have been synced to Doris.
+        def showUnrelated = sql """show tables from ${currentDb} like 
'${unrelated}'"""
+        assert showUnrelated.size() == 0
+
+        qt_select_count """select count(1) from ${currentDb}.${table1}"""
+        qt_select_updates """select id, version from ${currentDb}.${table1} 
where id in (1, 100, 500, 999) order by id"""
+        qt_select_deletes """select count(1) from ${currentDb}.${table1} where 
id in (2, 200, 800)"""
+        qt_select_inserts """select id, tag, version from 
${currentDb}.${table1} where id > ${totalRows} order by id"""
+
+        sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+        def jobCountRsp = sql """select count(1) from jobs("type"="insert") 
where Name ='${jobName}'"""
+        assert jobCountRsp.get(0).get(0) == 0
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to