This is an automated email from the ASF dual-hosted git repository.
JNSimba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 090b4768f82 [regression-test](streaming-job) add cdc cases
(composite/concurrent-dml/id-gap/decimal/datetime pk) and fix split-bound
java.time deserialize (#63471)
090b4768f82 is described below
commit 090b4768f8264af12b71924a0537924772103b3c
Author: wudi <[email protected]>
AuthorDate: Tue May 26 16:09:59 2026 +0800
[regression-test](streaming-job) add cdc cases
(composite/concurrent-dml/id-gap/decimal/datetime pk) and fix split-bound
java.time deserialize (#63471)
### What problem does this PR solve?
Extend CDC streaming-job regression coverage across MySQL/Postgres for
primary-key related scenarios that the existing suites did not exercise;
plus a small cdc-client follow-up to #63219 so split-bound restore
handles the modern java.time types that some JDBC drivers return by
default.
#### Regression cases added
| Case | Tables | Guarded by |
|---|---|---|
| `*_composite_pk` | composite PK + full-PK mapping table | composite PK
chunk split + INSERT/UPDATE/DELETE locating across all PK columns;
tenant/org-boundary chunk slicing |
| `*_snapshot_with_concurrent_dml` | 1000-row table + unrelated decoy |
source-side INSERT/UPDATE/DELETE during snapshot phase converges
correctly; unrelated table outside include_tables not leaked |
| `*_id_gap_completeness` | dense id 1~100 + outlier id 10000000 +
post-outlier 10000001~10000100 | snapshot reader covers tail rows past
the chunk-time max without dropping; binlog DML across all three id
regions applies correctly |
| `*_decimal_pk` | DECIMAL(20,4) PK + (MySQL only) BIGINT UNSIGNED PK |
evenly-path BigDecimal / BigInteger arithmetic in chunk splitter; bigint
unsigned values up to 2^64-1 |
| \`mysql_*_datetime_pk\` | DATETIME(6) PK + composite (DATETIME, id) PK
| java.time.LocalDateTime split-bound JSON round-trip; composite
temporal PK locating |
---
.../source/reader/AbstractCdcSourceReader.java | 23 ++-
.../cdc/test_streaming_mysql_job_composite_pk.out | 47 +++++
.../cdc/test_streaming_mysql_job_datetime_pk.out | 29 +++
.../cdc/test_streaming_mysql_job_decimal_pk.out | 29 +++
...est_streaming_mysql_job_id_gap_completeness.out | 21 +++
...ming_mysql_job_snapshot_with_concurrent_dml.out | 25 +++
.../test_streaming_postgres_job_composite_pk.out | 47 +++++
.../cdc/test_streaming_postgres_job_decimal_pk.out | 15 ++
..._streaming_postgres_job_id_gap_completeness.out | 21 +++
...g_postgres_job_snapshot_with_concurrent_dml.out | 25 +++
.../test_streaming_mysql_job_composite_pk.groovy | 198 +++++++++++++++++++++
.../test_streaming_mysql_job_datetime_pk.groovy | 153 ++++++++++++++++
.../cdc/test_streaming_mysql_job_decimal_pk.groovy | 152 ++++++++++++++++
..._streaming_mysql_job_id_gap_completeness.groovy | 159 +++++++++++++++++
...g_mysql_job_snapshot_with_concurrent_dml.groovy | 148 +++++++++++++++
...ng_postgres_job_async_split_pause_resume.groovy | 40 ++++-
...test_streaming_postgres_job_composite_pk.groovy | 198 +++++++++++++++++++++
.../test_streaming_postgres_job_decimal_pk.groovy | 130 ++++++++++++++
...reaming_postgres_job_id_gap_completeness.groovy | 146 +++++++++++++++
...ostgres_job_snapshot_with_concurrent_dml.groovy | 153 ++++++++++++++++
20 files changed, 1743 insertions(+), 16 deletions(-)
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java
index f006d791528..438f3ef6556 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java
@@ -30,6 +30,7 @@ import org.apache.kafka.connect.source.SourceRecord;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -88,6 +89,16 @@ public abstract class AbstractCdcSourceReader implements
SourceReader {
return out;
}
+ private static final Map<Class<?>, Function<String, Object>> BOUND_PARSERS
=
+ Map.of(
+ java.sql.Date.class, java.sql.Date::valueOf,
+ java.sql.Timestamp.class, java.sql.Timestamp::valueOf,
+ java.sql.Time.class, java.sql.Time::valueOf,
+ java.time.LocalDateTime.class,
java.time.LocalDateTime::parse,
+ java.time.LocalDate.class, java.time.LocalDate::parse,
+ java.time.LocalTime.class, java.time.LocalTime::parse,
+ java.time.OffsetDateTime.class,
java.time.OffsetDateTime::parse);
+
private static Object convertBound(Object v, Class<?> target, ObjectMapper
mapper) {
if (v == null) {
return null;
@@ -95,15 +106,9 @@ public abstract class AbstractCdcSourceReader implements
SourceReader {
if (target.isInstance(v)) {
return v;
}
- String s = v.toString();
- if (target == java.sql.Date.class) {
- return java.sql.Date.valueOf(s);
- }
- if (target == java.sql.Timestamp.class) {
- return java.sql.Timestamp.valueOf(s);
- }
- if (target == java.sql.Time.class) {
- return java.sql.Time.valueOf(s);
+ Function<String, Object> parser = BOUND_PARSERS.get(target);
+ if (parser != null) {
+ return parser.apply(v.toString());
}
return mapper.convertValue(v, target);
}
diff --git
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_composite_pk.out
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_composite_pk.out
new file mode 100644
index 00000000000..5558c43e9ea
--- /dev/null
+++
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_composite_pk.out
@@ -0,0 +1,47 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !desc_composite_pk --
+order_id bigint No true \N
+tenant_id int No true \N
+order_no varchar(192) Yes false \N NONE
+amount decimal(10,2) Yes false \N NONE
+
+-- !select_snapshot --
+1 1001 A_001 10.00
+1 1002 A_002 20.00
+1 1003 A_003 30.00
+1 1004 A_004 40.00
+1 1005 A_005 50.00
+2 2001 B_001 100.00
+2 2002 B_002 200.00
+2 2003 B_003 300.00
+2 2004 B_004 400.00
+2 2005 B_005 500.00
+
+-- !select_snapshot_map --
+10 100
+10 101
+10 102
+20 200
+20 201
+20 202
+
+-- !select_after_incr --
+1 1002 A_002 20.00
+1 1003 A_003 999.99
+1 1004 A_004 40.00
+1 1005 A_005 50.00
+1 1006 A_006 60.00
+2 2001 B_001 100.00
+2 2002 B_002 888.88
+2 2003 B_003 300.00
+2 2004 B_004 400.00
+2 2006 B_006 600.00
+
+-- !select_after_incr_map --
+10 101
+10 102
+10 103
+20 200
+20 201
+30 300
+
diff --git
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_datetime_pk.out
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_datetime_pk.out
new file mode 100644
index 00000000000..abb2be0da5e
--- /dev/null
+++
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_datetime_pk.out
@@ -0,0 +1,29 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select_snapshot_datetime_pk --
+2024-01-01T00:00 A1
+2024-06-15T12:00:00.123456 B1
+2025-01-01T00:00 C1
+2025-06-15T12:34:56.999999 D1
+2026-01-01T00:00 E1
+
+-- !select_snapshot_composite_pk --
+2024-02-01T00:00 1 A2
+2024-02-01T00:00 2 B2
+2024-02-02T12:00:00.500 3 C2
+2024-02-03T23:59:59.999999 4 D2
+2024-02-04T00:00 5 E2
+
+-- !select_after_incr_datetime_pk --
+2024-01-01T00:00 A1
+2024-06-15T12:00:00.123456 B2_upd
+2025-01-01T00:00 C1
+2026-01-01T00:00 E1
+2026-06-01T00:00 F2
+
+-- !select_after_incr_composite_pk --
+2024-02-01T00:00 1 A2
+2024-02-01T00:00 2 B2
+2024-02-02T12:00:00.500 3 C3_upd
+2024-02-04T00:00 5 E2
+2024-02-05T00:00 6 F3
+
diff --git
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_decimal_pk.out
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_decimal_pk.out
new file mode 100644
index 00000000000..1f195fc3a5b
--- /dev/null
+++
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_decimal_pk.out
@@ -0,0 +1,29 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select_snapshot_decimal --
+1.0000 A1
+2.5000 B1
+3.7500 C1
+4.1234 D1
+9999999999999.9999 E1
+
+-- !select_snapshot_bigint_unsigned --
+0 zero
+1 one
+9223372036854775807 signed_max
+9223372036854775808 signed_max_plus1
+18446744073709551615 unsigned_max
+
+-- !select_after_incr_decimal --
+1.0000 A1
+2.5000 B2_upd
+3.7500 C1
+5.5555 F2
+9999999999999.9999 E1
+
+-- !select_after_incr_bigint_unsigned --
+0 zero
+1 one
+9223372036854775807 signed_max_upd
+10000000000000000000 incr_huge
+18446744073709551615 unsigned_max
+
diff --git
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_id_gap_completeness.out
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_id_gap_completeness.out
new file mode 100644
index 00000000000..f83e36329a0
--- /dev/null
+++
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_id_gap_completeness.out
@@ -0,0 +1,21 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select_snapshot_count --
+201
+
+-- !select_snapshot_dense_sample --
+1 dense
+50 dense
+100 dense
+
+-- !select_snapshot_outlier_sample --
+10000000 outlier
+10000050 post_outlier
+10000100 post_outlier
+
+-- !select_after_incr_count --
+201
+
+-- !select_after_incr_changed --
+50 dense 99
+10000200 incr_outlier 1
+
diff --git
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_snapshot_with_concurrent_dml.out
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_snapshot_with_concurrent_dml.out
new file mode 100644
index 00000000000..7f1b97bd503
--- /dev/null
+++
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_snapshot_with_concurrent_dml.out
@@ -0,0 +1,25 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select_count --
+1007
+
+-- !select_updates --
+1 99
+100 99
+500 99
+999 99
+
+-- !select_deletes --
+0
+
+-- !select_inserts --
+1001 concurrent_ins 1
+1002 concurrent_ins 1
+1003 concurrent_ins 1
+1004 concurrent_ins 1
+1005 concurrent_ins 1
+1006 concurrent_ins 1
+1007 concurrent_ins 1
+1008 concurrent_ins 1
+1009 concurrent_ins 1
+1010 concurrent_ins 1
+
diff --git
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_composite_pk.out
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_composite_pk.out
new file mode 100644
index 00000000000..bb05bc4cd26
--- /dev/null
+++
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_composite_pk.out
@@ -0,0 +1,47 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !desc_composite_pk --
+org_id int No true \N
+item_id bigint No true \N
+item_no text Yes false \N NONE
+qty int Yes false \N NONE
+
+-- !select_snapshot --
+1 1001 A_001 10
+1 1002 A_002 20
+1 1003 A_003 30
+1 1004 A_004 40
+1 1005 A_005 50
+2 2001 B_001 100
+2 2002 B_002 200
+2 2003 B_003 300
+2 2004 B_004 400
+2 2005 B_005 500
+
+-- !select_snapshot_map --
+10 100
+10 101
+10 102
+20 200
+20 201
+20 202
+
+-- !select_after_incr --
+1 1002 A_002 20
+1 1003 A_003 999
+1 1004 A_004 40
+1 1005 A_005 50
+1 1006 A_006 60
+2 2001 B_001 100
+2 2002 B_002 888
+2 2003 B_003 300
+2 2004 B_004 400
+2 2006 B_006 600
+
+-- !select_after_incr_map --
+10 101
+10 102
+10 103
+20 200
+20 201
+30 300
+
diff --git
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_decimal_pk.out
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_decimal_pk.out
new file mode 100644
index 00000000000..db5c46e4d07
--- /dev/null
+++
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_decimal_pk.out
@@ -0,0 +1,15 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select_snapshot --
+1.0000 A1
+2.5000 B1
+3.7500 C1
+4.1234 D1
+9999999999999.9999 E1
+
+-- !select_after_incr --
+1.0000 A1
+2.5000 B2_upd
+3.7500 C1
+5.5555 F2
+9999999999999.9999 E1
+
diff --git
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_id_gap_completeness.out
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_id_gap_completeness.out
new file mode 100644
index 00000000000..f83e36329a0
--- /dev/null
+++
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_id_gap_completeness.out
@@ -0,0 +1,21 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select_snapshot_count --
+201
+
+-- !select_snapshot_dense_sample --
+1 dense
+50 dense
+100 dense
+
+-- !select_snapshot_outlier_sample --
+10000000 outlier
+10000050 post_outlier
+10000100 post_outlier
+
+-- !select_after_incr_count --
+201
+
+-- !select_after_incr_changed --
+50 dense 99
+10000200 incr_outlier 1
+
diff --git
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_with_concurrent_dml.out
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_with_concurrent_dml.out
new file mode 100644
index 00000000000..7f1b97bd503
--- /dev/null
+++
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_with_concurrent_dml.out
@@ -0,0 +1,25 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select_count --
+1007
+
+-- !select_updates --
+1 99
+100 99
+500 99
+999 99
+
+-- !select_deletes --
+0
+
+-- !select_inserts --
+1001 concurrent_ins 1
+1002 concurrent_ins 1
+1003 concurrent_ins 1
+1004 concurrent_ins 1
+1005 concurrent_ins 1
+1006 concurrent_ins 1
+1007 concurrent_ins 1
+1008 concurrent_ins 1
+1009 concurrent_ins 1
+1010 concurrent_ins 1
+
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_composite_pk.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_composite_pk.groovy
new file mode 100644
index 00000000000..188d459be5f
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_composite_pk.groovy
@@ -0,0 +1,198 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_mysql_job_composite_pk",
"p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
+ def jobName = "test_streaming_mysql_job_composite_pk_name"
+ def currentDb = (sql "select database()")[0][0]
+ def table1 = "streaming_composite_pk_mysql"
+ def table2 = "streaming_full_pk_map_mysql"
+ def mysqlDb = "test_cdc_db"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${table1} force"""
+ sql """drop table if exists ${currentDb}.${table2} force"""
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String mysql_port = context.config.otherConfigs.get("mysql_57_port");
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar"
+
+ // ===== Prepare MySQL side =====
+ connect("root", "123456",
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+ sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}"""
+ sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}"""
+ sql """
+ create table ${mysqlDb}.${table1} (
+ `tenant_id` int not null,
+ `order_id` bigint not null,
+ `order_no` varchar(64),
+ `amount` decimal(10,2),
+ primary key (`tenant_id`, `order_id`)
+ ) engine=innodb default charset=utf8;
+ """
+
+ // Snapshot rows: 2 tenants x 5 orders each.
+ sql """insert into ${mysqlDb}.${table1} values (1, 1001, 'A_001',
10.00)"""
+ sql """insert into ${mysqlDb}.${table1} values (1, 1002, 'A_002',
20.00)"""
+ sql """insert into ${mysqlDb}.${table1} values (1, 1003, 'A_003',
30.00)"""
+ sql """insert into ${mysqlDb}.${table1} values (1, 1004, 'A_004',
40.00)"""
+ sql """insert into ${mysqlDb}.${table1} values (1, 1005, 'A_005',
50.00)"""
+ sql """insert into ${mysqlDb}.${table1} values (2, 2001, 'B_001',
100.00)"""
+ sql """insert into ${mysqlDb}.${table1} values (2, 2002, 'B_002',
200.00)"""
+ sql """insert into ${mysqlDb}.${table1} values (2, 2003, 'B_003',
300.00)"""
+ sql """insert into ${mysqlDb}.${table1} values (2, 2004, 'B_004',
400.00)"""
+ sql """insert into ${mysqlDb}.${table1} values (2, 2005, 'B_005',
500.00)"""
+
+ sql """DROP TABLE IF EXISTS ${mysqlDb}.${table2}"""
+ sql """
+ create table ${mysqlDb}.${table2} (
+ `user_id` int not null,
+ `role_id` int not null,
+ primary key (`user_id`, `role_id`)
+ ) engine=innodb default charset=utf8;
+ """
+ sql """insert into ${mysqlDb}.${table2} values (10, 100)"""
+ sql """insert into ${mysqlDb}.${table2} values (10, 101)"""
+ sql """insert into ${mysqlDb}.${table2} values (10, 102)"""
+ sql """insert into ${mysqlDb}.${table2} values (20, 200)"""
+ sql """insert into ${mysqlDb}.${table2} values (20, 201)"""
+ sql """insert into ${mysqlDb}.${table2} values (20, 202)"""
+ }
+
+ // snapshot_split_size=3 -> chunks cross the tenant boundary.
+ sql """CREATE JOB ${jobName}
+ ON STREAMING
+ FROM MYSQL (
+ "jdbc_url" =
"jdbc:mysql://${externalEnvIp}:${mysql_port}?serverTimezone=UTC",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "com.mysql.cj.jdbc.Driver",
+ "user" = "root",
+ "password" = "123456",
+ "database" = "${mysqlDb}",
+ "include_tables" = "${table1},${table2}",
+ "offset" = "initial",
+ "snapshot_split_size" = "3"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )
+ """
+
+ try {
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(2, SECONDS).until(
+ {
+ def cnt1 = sql """select count(1) from
${currentDb}.${table1}"""
+ def cnt2 = sql """select count(1) from
${currentDb}.${table2}"""
+ log.info("snapshot row count table1=${cnt1}
table2=${cnt2}")
+ cnt1.get(0).get(0) == 10 && cnt2.get(0).get(0) == 6
+ }
+ )
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex
+ }
+
+ def showTbl = sql """show create table ${currentDb}.${table1}"""
+ def createInfo = showTbl[0][1]
+ log.info("create table: " + createInfo)
+ assert createInfo.contains("UNIQUE KEY(`order_id`, `tenant_id`)")
+
+ def showTbl2 = sql """show create table ${currentDb}.${table2}"""
+ def createInfo2 = showTbl2[0][1]
+ log.info("create table2: " + createInfo2)
+ assert createInfo2.contains("UNIQUE KEY(`role_id`, `user_id`)")
+
+ qt_desc_composite_pk """desc ${currentDb}.${table1};"""
+ qt_select_snapshot """select tenant_id, order_id, order_no, amount
from ${currentDb}.${table1} order by tenant_id, order_id;"""
+ qt_select_snapshot_map """select user_id, role_id from
${currentDb}.${table2} order by user_id, role_id;"""
+
+ // ===== Binlog phase: INSERT / UPDATE / DELETE that all require
composite-PK locating =====
+ connect("root", "123456",
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+ sql """insert into ${mysqlDb}.${table1} values (1, 1006, 'A_006',
60.00)"""
+ sql """insert into ${mysqlDb}.${table1} values (2, 2006, 'B_006',
600.00)"""
+
+ // UPDATEs targeting composite PK; both halves of the PK in WHERE.
+ sql """update ${mysqlDb}.${table1} set amount=999.99 where
tenant_id=1 and order_id=1003"""
+ sql """update ${mysqlDb}.${table1} set amount=888.88 where
tenant_id=2 and order_id=2002"""
+
+ // DELETEs targeting composite PK.
+ sql """delete from ${mysqlDb}.${table1} where tenant_id=1 and
order_id=1001"""
+ sql """delete from ${mysqlDb}.${table1} where tenant_id=2 and
order_id=2005"""
+
+ // table2 full-PK mapping table: INSERT + DELETE only (no value
column to UPDATE).
+ sql """insert into ${mysqlDb}.${table2} values (10, 103)"""
+ sql """insert into ${mysqlDb}.${table2} values (30, 300)"""
+ sql """delete from ${mysqlDb}.${table2} where user_id=10 and
role_id=100"""
+ sql """delete from ${mysqlDb}.${table2} where user_id=20 and
role_id=202"""
+ }
+
+ try {
+ Awaitility.await().atMost(180, SECONDS)
+ .pollInterval(2, SECONDS).until(
+ {
+ def cnt = sql """select count(1) from
${currentDb}.${table1}"""
+ def cntMap = sql """select count(1) from
${currentDb}.${table2}"""
+ def upd1 = sql """select amount from
${currentDb}.${table1} where tenant_id=1 and order_id=1003"""
+ def upd2 = sql """select amount from
${currentDb}.${table1} where tenant_id=2 and order_id=2002"""
+ def del1 = sql """select count(1) from
${currentDb}.${table1} where tenant_id=1 and order_id=1001"""
+ def del2 = sql """select count(1) from
${currentDb}.${table1} where tenant_id=2 and order_id=2005"""
+ def mapNew = sql """select count(1) from
${currentDb}.${table2} where user_id=30 and role_id=300"""
+ def mapDel = sql """select count(1) from
${currentDb}.${table2} where user_id=10 and role_id=100"""
+ def a1 = upd1.size() == 0 ? null : upd1.get(0).get(0)
+ def a2 = upd2.size() == 0 ? null : upd2.get(0).get(0)
+ log.info("incr count=" + cnt + " map.count=" + cntMap
+ " upd1.amount=" + a1 + " upd2.amount=" + a2
+ + " del1.exists=" + del1 + " del2.exists=" +
del2
+ + " mapNew=" + mapNew + " mapDel=" + mapDel)
+ cnt.get(0).get(0) == 10 &&
+ cntMap.get(0).get(0) == 6 &&
+ a1 != null && a1.toString() == '999.99' &&
+ a2 != null && a2.toString() == '888.88' &&
+ del1.get(0).get(0) == 0 &&
+ del2.get(0).get(0) == 0 &&
+ mapNew.get(0).get(0) == 1 &&
+ mapDel.get(0).get(0) == 0
+ }
+ )
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job (incr): " + showjob)
+ log.info("show task (incr): " + showtask)
+ throw ex
+ }
+
+ qt_select_after_incr """select tenant_id, order_id, order_no, amount
from ${currentDb}.${table1} order by tenant_id, order_id;"""
+ qt_select_after_incr_map """select user_id, role_id from
${currentDb}.${table2} order by user_id, role_id;"""
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+ def jobCountRsp = sql """select count(1) from jobs("type"="insert")
where Name ='${jobName}'"""
+ assert jobCountRsp.get(0).get(0) == 0
+ }
+}
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_datetime_pk.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_datetime_pk.groovy
new file mode 100644
index 00000000000..f37b60539d0
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_datetime_pk.groovy
@@ -0,0 +1,153 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_mysql_job_datetime_pk",
"p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
+ def jobName = "test_streaming_mysql_job_datetime_pk_name"
+ def currentDb = (sql "select database()")[0][0]
+ def tableDt = "events_mysql_datetime_pk"
+ def tableComposite = "events_mysql_datetime_id_pk"
+ def mysqlDb = "test_cdc_db"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${tableDt} force"""
+ sql """drop table if exists ${currentDb}.${tableComposite} force"""
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String mysql_port = context.config.otherConfigs.get("mysql_57_port");
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar"
+
+ connect("root", "123456",
"jdbc:mysql://${externalEnvIp}:${mysql_port}?serverTimezone=UTC") {
+ sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}"""
+ sql """DROP TABLE IF EXISTS ${mysqlDb}.${tableDt}"""
+ sql """CREATE TABLE ${mysqlDb}.${tableDt} (
+ `event_ts` datetime(6) NOT NULL,
+ `payload` varchar(64),
+ PRIMARY KEY (`event_ts`)
+ ) ENGINE=InnoDB"""
+ sql """INSERT INTO ${mysqlDb}.${tableDt} VALUES ('2024-01-01
00:00:00.000000', 'A1')"""
+ sql """INSERT INTO ${mysqlDb}.${tableDt} VALUES ('2024-06-15
12:00:00.123456', 'B1')"""
+ sql """INSERT INTO ${mysqlDb}.${tableDt} VALUES ('2025-01-01
00:00:00.000000', 'C1')"""
+ sql """INSERT INTO ${mysqlDb}.${tableDt} VALUES ('2025-06-15
12:34:56.999999', 'D1')"""
+ sql """INSERT INTO ${mysqlDb}.${tableDt} VALUES ('2026-01-01
00:00:00.000000', 'E1')"""
+
+ sql """DROP TABLE IF EXISTS ${mysqlDb}.${tableComposite}"""
+ sql """CREATE TABLE ${mysqlDb}.${tableComposite} (
+ `event_ts` datetime(6) NOT NULL,
+ `id` int NOT NULL,
+ `payload` varchar(64),
+ PRIMARY KEY (`event_ts`, `id`)
+ ) ENGINE=InnoDB"""
+ sql """INSERT INTO ${mysqlDb}.${tableComposite} VALUES
('2024-02-01 00:00:00.000000', 1, 'A2')"""
+ sql """INSERT INTO ${mysqlDb}.${tableComposite} VALUES
('2024-02-01 00:00:00.000000', 2, 'B2')"""
+ sql """INSERT INTO ${mysqlDb}.${tableComposite} VALUES
('2024-02-02 12:00:00.500000', 3, 'C2')"""
+ sql """INSERT INTO ${mysqlDb}.${tableComposite} VALUES
('2024-02-03 23:59:59.999999', 4, 'D2')"""
+ sql """INSERT INTO ${mysqlDb}.${tableComposite} VALUES
('2024-02-04 00:00:00.000000', 5, 'E2')"""
+ }
+
+ sql """CREATE JOB ${jobName}
+ ON STREAMING
+ FROM MYSQL (
+ "jdbc_url" =
"jdbc:mysql://${externalEnvIp}:${mysql_port}?serverTimezone=UTC",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "com.mysql.cj.jdbc.Driver",
+ "user" = "root",
+ "password" = "123456",
+ "database" = "${mysqlDb}",
+ "include_tables" = "${tableDt},${tableComposite}",
+ "offset" = "initial",
+ "snapshot_split_size" = "2"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )
+ """
+
+ try {
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(2, SECONDS).until(
+ {
+ def c1 = sql """select count(1) from
${currentDb}.${tableDt}"""
+ def c2 = sql """select count(1) from
${currentDb}.${tableComposite}"""
+ log.info("snapshot row count dt=${c1} composite=${c2}")
+ c1.get(0).get(0) == 5 && c2.get(0).get(0) == 5
+ }
+ )
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex
+ }
+
+ qt_select_snapshot_datetime_pk """select event_ts, payload from
${currentDb}.${tableDt} order by event_ts asc"""
+ qt_select_snapshot_composite_pk """select event_ts, id, payload from
${currentDb}.${tableComposite} order by event_ts asc, id asc"""
+
+ connect("root", "123456",
"jdbc:mysql://${externalEnvIp}:${mysql_port}?serverTimezone=UTC") {
+ sql """INSERT INTO ${mysqlDb}.${tableDt} VALUES ('2026-06-01
00:00:00.000000', 'F2')"""
+ sql """UPDATE ${mysqlDb}.${tableDt} SET payload='B2_upd' WHERE
event_ts='2024-06-15 12:00:00.123456'"""
+ sql """DELETE FROM ${mysqlDb}.${tableDt} WHERE
event_ts='2025-06-15 12:34:56.999999'"""
+
+ sql """INSERT INTO ${mysqlDb}.${tableComposite} VALUES
('2024-02-05 00:00:00.000000', 6, 'F3')"""
+ sql """UPDATE ${mysqlDb}.${tableComposite} SET payload='C3_upd'
WHERE event_ts='2024-02-02 12:00:00.500000' AND id=3"""
+ sql """DELETE FROM ${mysqlDb}.${tableComposite} WHERE
event_ts='2024-02-03 23:59:59.999999' AND id=4"""
+ }
+
+ try {
+ Awaitility.await().atMost(180, SECONDS)
+ .pollInterval(2, SECONDS).until(
+ {
+ def c1 = sql """select count(1) from
${currentDb}.${tableDt}"""
+ def c2 = sql """select count(1) from
${currentDb}.${tableComposite}"""
+ def upd1 = sql """select payload from
${currentDb}.${tableDt} where event_ts='2024-06-15 12:00:00.123456'"""
+ def upd2 = sql """select payload from
${currentDb}.${tableComposite} where event_ts='2024-02-02 12:00:00.500000' and
id=3"""
+ def del1 = sql """select count(1) from
${currentDb}.${tableDt} where event_ts='2025-06-15 12:34:56.999999'"""
+ def del2 = sql """select count(1) from
${currentDb}.${tableComposite} where event_ts='2024-02-03 23:59:59.999999' and
id=4"""
+ def p1 = upd1.size() == 0 ? null : upd1.get(0).get(0)
+ def p2 = upd2.size() == 0 ? null : upd2.get(0).get(0)
+ log.info("incr dt=${c1} composite=${c2} dt_upd=${p1}
comp_upd=${p2} dt_del=${del1} comp_del=${del2}")
+ c1.get(0).get(0) == 5 && c2.get(0).get(0) == 5 &&
+ p1 == 'B2_upd' && p2 == 'C3_upd' &&
+ del1.get(0).get(0) == 0 && del2.get(0).get(0)
== 0
+ }
+ )
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job (incr): " + showjob)
+ log.info("show task (incr): " + showtask)
+ throw ex
+ }
+
+ qt_select_after_incr_datetime_pk """select event_ts, payload from
${currentDb}.${tableDt} order by event_ts asc"""
+ qt_select_after_incr_composite_pk """select event_ts, id, payload from
${currentDb}.${tableComposite} order by event_ts asc, id asc"""
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+ def jobCountRsp = sql """select count(1) from jobs("type"="insert")
where Name ='${jobName}'"""
+ assert jobCountRsp.get(0).get(0) == 0
+ }
+}
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_decimal_pk.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_decimal_pk.groovy
new file mode 100644
index 00000000000..def9183809f
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_decimal_pk.groovy
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_mysql_job_decimal_pk",
"p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
+ def jobName = "test_streaming_mysql_job_decimal_pk_name"
+ def currentDb = (sql "select database()")[0][0]
+ def tableDecimal = "streaming_decimal_pk_mysql"
+ def tableBigintUnsigned = "streaming_bigint_unsigned_pk_mysql"
+ def mysqlDb = "test_cdc_db"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${tableDecimal} force"""
+ sql """drop table if exists ${currentDb}.${tableBigintUnsigned} force"""
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String mysql_port = context.config.otherConfigs.get("mysql_57_port");
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar"
+
+ connect("root", "123456",
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+ sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}"""
+ sql """DROP TABLE IF EXISTS ${mysqlDb}.${tableDecimal}"""
+ sql """CREATE TABLE ${mysqlDb}.${tableDecimal} (
+ `amount` decimal(20,4) NOT NULL,
+ `payload` varchar(64),
+ PRIMARY KEY (`amount`)
+ ) ENGINE=InnoDB"""
+ sql """INSERT INTO ${mysqlDb}.${tableDecimal} VALUES (1.0000,
'A1')"""
+ sql """INSERT INTO ${mysqlDb}.${tableDecimal} VALUES (2.5000,
'B1')"""
+ sql """INSERT INTO ${mysqlDb}.${tableDecimal} VALUES (3.7500,
'C1')"""
+ sql """INSERT INTO ${mysqlDb}.${tableDecimal} VALUES (4.1234,
'D1')"""
+ sql """INSERT INTO ${mysqlDb}.${tableDecimal} VALUES
(9999999999999.9999, 'E1')"""
+
+ sql """DROP TABLE IF EXISTS ${mysqlDb}.${tableBigintUnsigned}"""
+ sql """CREATE TABLE ${mysqlDb}.${tableBigintUnsigned} (
+ `id` bigint unsigned NOT NULL,
+ `payload` varchar(64),
+ PRIMARY KEY (`id`)
+ ) ENGINE=InnoDB"""
+ sql """INSERT INTO ${mysqlDb}.${tableBigintUnsigned} VALUES (0,
'zero')"""
+ sql """INSERT INTO ${mysqlDb}.${tableBigintUnsigned} VALUES (1,
'one')"""
+ sql """INSERT INTO ${mysqlDb}.${tableBigintUnsigned} VALUES
(9223372036854775807, 'signed_max')"""
+ sql """INSERT INTO ${mysqlDb}.${tableBigintUnsigned} VALUES
(9223372036854775808, 'signed_max_plus1')"""
+ sql """INSERT INTO ${mysqlDb}.${tableBigintUnsigned} VALUES
(18446744073709551615, 'unsigned_max')"""
+ }
+
+ sql """CREATE JOB ${jobName}
+ ON STREAMING
+ FROM MYSQL (
+ "jdbc_url" =
"jdbc:mysql://${externalEnvIp}:${mysql_port}?serverTimezone=UTC",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "com.mysql.cj.jdbc.Driver",
+ "user" = "root",
+ "password" = "123456",
+ "database" = "${mysqlDb}",
+ "include_tables" =
"${tableDecimal},${tableBigintUnsigned}",
+ "offset" = "initial",
+ "snapshot_split_size" = "2"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )
+ """
+
+ try {
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(2, SECONDS).until(
+ {
+ def c1 = sql """select count(1) from
${currentDb}.${tableDecimal}"""
+ def c2 = sql """select count(1) from
${currentDb}.${tableBigintUnsigned}"""
+ log.info("snapshot row count decimal=${c1}
bigint_unsigned=${c2}")
+ c1.get(0).get(0) == 5 && c2.get(0).get(0) == 5
+ }
+ )
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex
+ }
+
+ qt_select_snapshot_decimal """select amount, payload from
${currentDb}.${tableDecimal} order by amount asc"""
+ qt_select_snapshot_bigint_unsigned """select id, payload from
${currentDb}.${tableBigintUnsigned} order by id asc"""
+
+ connect("root", "123456",
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+ sql """INSERT INTO ${mysqlDb}.${tableDecimal} VALUES (5.5555,
'F2')"""
+ sql """UPDATE ${mysqlDb}.${tableDecimal} SET payload='B2_upd'
WHERE amount=2.5000"""
+ sql """DELETE FROM ${mysqlDb}.${tableDecimal} WHERE
amount=4.1234"""
+
+ sql """INSERT INTO ${mysqlDb}.${tableBigintUnsigned} VALUES
(10000000000000000000, 'incr_huge')"""
+ sql """UPDATE ${mysqlDb}.${tableBigintUnsigned} SET
payload='signed_max_upd' WHERE id=9223372036854775807"""
+ sql """DELETE FROM ${mysqlDb}.${tableBigintUnsigned} WHERE
id=9223372036854775808"""
+ }
+
+ try {
+ Awaitility.await().atMost(180, SECONDS)
+ .pollInterval(2, SECONDS).until(
+ {
+ def c1 = sql """select count(1) from
${currentDb}.${tableDecimal}"""
+ def c2 = sql """select count(1) from
${currentDb}.${tableBigintUnsigned}"""
+ def upd1 = sql """select payload from
${currentDb}.${tableDecimal} where amount=2.5000"""
+ def upd2 = sql """select payload from
${currentDb}.${tableBigintUnsigned} where id=9223372036854775807"""
+ def del1 = sql """select count(1) from
${currentDb}.${tableDecimal} where amount=4.1234"""
+ def del2 = sql """select count(1) from
${currentDb}.${tableBigintUnsigned} where id=9223372036854775808"""
+ def p1 = upd1.size() == 0 ? null : upd1.get(0).get(0)
+ def p2 = upd2.size() == 0 ? null : upd2.get(0).get(0)
+ log.info("incr decimal=${c1} bigint_unsigned=${c2}
dec_upd=${p1} bui_upd=${p2} dec_del=${del1} bui_del=${del2}")
+ c1.get(0).get(0) == 5 && c2.get(0).get(0) == 5 &&
+ p1 == 'B2_upd' && p2 == 'signed_max_upd' &&
+ del1.get(0).get(0) == 0 && del2.get(0).get(0)
== 0
+ }
+ )
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job (incr): " + showjob)
+ log.info("show task (incr): " + showtask)
+ throw ex
+ }
+
+ qt_select_after_incr_decimal """select amount, payload from
${currentDb}.${tableDecimal} order by amount asc"""
+ qt_select_after_incr_bigint_unsigned """select id, payload from
${currentDb}.${tableBigintUnsigned} order by id asc"""
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+ def jobCountRsp = sql """select count(1) from jobs("type"="insert")
where Name ='${jobName}'"""
+ assert jobCountRsp.get(0).get(0) == 0
+ }
+}
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_id_gap_completeness.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_id_gap_completeness.groovy
new file mode 100644
index 00000000000..bfd15341095
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_id_gap_completeness.groovy
@@ -0,0 +1,159 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_mysql_job_id_gap_completeness",
"p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
+ def jobName = "test_streaming_mysql_job_id_gap_completeness_name"
+ def currentDb = (sql "select database()")[0][0]
+ def table1 = "streaming_id_gap_mysql"
+ def mysqlDb = "test_cdc_db"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${table1} force"""
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String mysql_port = context.config.otherConfigs.get("mysql_57_port");
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar"
+
+ // ===== Prepare MySQL side =====
+ connect("root", "123456",
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+ sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}"""
+ sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}"""
+ sql """CREATE TABLE ${mysqlDb}.${table1} (
+ `id` bigint NOT NULL,
+ `tag` varchar(64),
+ `version` int,
+ PRIMARY KEY (`id`)
+ ) ENGINE=InnoDB"""
+
+ // Dense rows id 1..100
+ StringBuilder sb = new StringBuilder()
+ sb.append("INSERT INTO ${mysqlDb}.${table1} (id, tag, version)
VALUES ")
+ for (int i = 1; i <= 100; i++) {
+ if (i > 1) sb.append(", ")
+ sb.append("(${i}, 'dense', 0)")
+ }
+ sql sb.toString()
+
+ // Outlier id with a huge gap so it lands in the unbounded chunk.
+ sql """INSERT INTO ${mysqlDb}.${table1} (id, tag, version) VALUES
(10000000, 'outlier', 0)"""
+
+ StringBuilder sb2 = new StringBuilder()
+ sb2.append("INSERT INTO ${mysqlDb}.${table1} (id, tag, version)
VALUES ")
+ for (int i = 1; i <= 100; i++) {
+ if (i > 1) sb2.append(", ")
+ sb2.append("(${10000000 + i}, 'post_outlier', 0)")
+ }
+ sql sb2.toString()
+ }
+
+ // snapshot_split_size=20 -> 5 bounded chunks for id<=100 + 1
unbounded chunk for the rest.
+ sql """CREATE JOB ${jobName}
+ ON STREAMING
+ FROM MYSQL (
+ "jdbc_url" =
"jdbc:mysql://${externalEnvIp}:${mysql_port}?serverTimezone=UTC",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "com.mysql.cj.jdbc.Driver",
+ "user" = "root",
+ "password" = "123456",
+ "database" = "${mysqlDb}",
+ "include_tables" = "${table1}",
+ "offset" = "initial",
+ "snapshot_split_size" = "20"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )
+ """
+
+ try {
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(2, SECONDS).until(
+ {
+ def cnt = sql """select count(1) from
${currentDb}.${table1}"""
+ log.info("snapshot row count: " + cnt)
+ cnt.get(0).get(0) == 201
+ }
+ )
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex
+ }
+
+ // All three id ranges must have synced fully: dense / outlier /
post-outlier.
+ def distinctCount = sql """SELECT COUNT(DISTINCT id) FROM
${currentDb}.${table1}"""
+ assert distinctCount.get(0).get(0) == 201
+ def outlierExists = sql """SELECT COUNT(1) FROM ${currentDb}.${table1}
WHERE id=10000000"""
+ assert outlierExists.get(0).get(0) == 1
+ def postOutlierCount = sql """SELECT COUNT(1) FROM
${currentDb}.${table1} WHERE id BETWEEN 10000001 AND 10000100"""
+ assert postOutlierCount.get(0).get(0) == 100
+
+ qt_select_snapshot_count """select count(1) from
${currentDb}.${table1}"""
+ qt_select_snapshot_dense_sample """select id, tag from
${currentDb}.${table1} where id in (1, 50, 100) order by id"""
+ qt_select_snapshot_outlier_sample """select id, tag from
${currentDb}.${table1} where id in (10000000, 10000050, 10000100) order by id"""
+
+ // ===== Binlog phase: cover dense + outlier + post-outlier ranges
=====
+ connect("root", "123456",
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+ sql """INSERT INTO ${mysqlDb}.${table1} (id, tag, version) VALUES
(10000200, 'incr_outlier', 1)"""
+ sql """UPDATE ${mysqlDb}.${table1} SET version=99 WHERE id=50"""
+ sql """DELETE FROM ${mysqlDb}.${table1} WHERE id=10000000"""
+ }
+
+ try {
+ Awaitility.await().atMost(180, SECONDS)
+ .pollInterval(2, SECONDS).until(
+ {
+ def cnt = sql """select count(1) from
${currentDb}.${table1}"""
+ def upd = sql """select version from
${currentDb}.${table1} where id=50"""
+ def del = sql """select count(1) from
${currentDb}.${table1} where id=10000000"""
+ def ins = sql """select count(1) from
${currentDb}.${table1} where id=10000200"""
+ def v = upd.size() == 0 ? null : upd.get(0).get(0)
+ log.info("incr cnt=${cnt} id50.version=${v}
id10000000.exists=${del} id10000200.exists=${ins}")
+ cnt.get(0).get(0) == 201 &&
+ v != null && v.toString() == '99' &&
+ del.get(0).get(0) == 0 &&
+ ins.get(0).get(0) == 1
+ }
+ )
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job (incr): " + showjob)
+ log.info("show task (incr): " + showtask)
+ throw ex
+ }
+
+ qt_select_after_incr_count """select count(1) from
${currentDb}.${table1}"""
+ qt_select_after_incr_changed """select id, tag, version from
${currentDb}.${table1} where id in (50, 10000200) order by id"""
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+ def jobCountRsp = sql """select count(1) from jobs("type"="insert")
where Name ='${jobName}'"""
+ assert jobCountRsp.get(0).get(0) == 0
+ }
+}
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_snapshot_with_concurrent_dml.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_snapshot_with_concurrent_dml.groovy
new file mode 100644
index 00000000000..597bea37320
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_snapshot_with_concurrent_dml.groovy
@@ -0,0 +1,148 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_mysql_job_snapshot_with_concurrent_dml",
"p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
+ def jobName = "test_streaming_mysql_job_snapshot_with_concurrent_dml_name"
+ def currentDb = (sql "select database()")[0][0]
+ def table1 = "streaming_snapshot_dml_mysql"
+ def unrelated = "streaming_snapshot_dml_unrelated_mysql"
+ def mysqlDb = "test_cdc_db"
+ def totalRows = 1000
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${table1} force"""
+ sql """drop table if exists ${currentDb}.${unrelated} force"""
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String mysql_port = context.config.otherConfigs.get("mysql_57_port");
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar"
+
+ // ===== Prepare MySQL side =====
+ connect("root", "123456",
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+ sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}"""
+ sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}"""
+ sql """CREATE TABLE ${mysqlDb}.${table1} (
+ `id` int NOT NULL,
+ `tag` varchar(64),
+ `version` int,
+ PRIMARY KEY (`id`)
+ ) ENGINE=InnoDB"""
+
+ StringBuilder sb = new StringBuilder()
+ sb.append("INSERT INTO ${mysqlDb}.${table1} (id, tag, version)
VALUES ")
+ for (int i = 1; i <= totalRows; i++) {
+ if (i > 1) sb.append(", ")
+ sb.append("(${i}, 'snap', 0)")
+ }
+ sql sb.toString()
+
+ sql """DROP TABLE IF EXISTS ${mysqlDb}.${unrelated}"""
+ sql """CREATE TABLE ${mysqlDb}.${unrelated} (
+ `id` int NOT NULL,
+ `tag` varchar(64),
+ PRIMARY KEY (`id`)
+ ) ENGINE=InnoDB"""
+ sql """INSERT INTO ${mysqlDb}.${unrelated} (id, tag) VALUES (1,
'pre_snap')"""
+ }
+
+ // snapshot_split_size=10 + snapshot_parallelism=1 -> 100 serial
splits, slow enough that
+ // the concurrent DML below actually overlaps with snapshot.
+ sql """CREATE JOB ${jobName}
+ ON STREAMING
+ FROM MYSQL (
+ "jdbc_url" =
"jdbc:mysql://${externalEnvIp}:${mysql_port}?serverTimezone=UTC",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "com.mysql.cj.jdbc.Driver",
+ "user" = "root",
+ "password" = "123456",
+ "database" = "${mysqlDb}",
+ "include_tables" = "${table1}",
+ "offset" = "initial",
+ "snapshot_split_size" = "10",
+ "snapshot_parallelism" = "1"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )
+ """
+
+ // Concurrent DML on source while cdc-client is still snapshotting.
+ connect("root", "123456",
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+ for (int i = 1; i <= 10; i++) {
+ sql """INSERT INTO ${mysqlDb}.${table1} (id, tag, version)
VALUES (${totalRows + i}, 'concurrent_ins', 1)"""
+ }
+ sql """UPDATE ${mysqlDb}.${table1} SET version=99 WHERE id IN (1,
100, 500, 999)"""
+ sql """DELETE FROM ${mysqlDb}.${table1} WHERE id IN (2, 200,
800)"""
+
+ // DML on unrelated table - must NOT leak into Doris (not in
include_tables).
+ sql """INSERT INTO ${mysqlDb}.${unrelated} (id, tag) VALUES (2,
'concurrent_unrelated_ins')"""
+ sql """UPDATE ${mysqlDb}.${unrelated} SET
tag='concurrent_unrelated_upd' WHERE id=1"""
+ }
+
+ def expectedRows = totalRows + 10 - 3
+ try {
+ Awaitility.await().atMost(600, SECONDS)
+ .pollInterval(2, SECONDS).until(
+ {
+ def cnt = sql """select count(1) from
${currentDb}.${table1}"""
+ def upd1 = sql """select version from
${currentDb}.${table1} where id=1"""
+ def upd999 = sql """select version from
${currentDb}.${table1} where id=999"""
+ def del2 = sql """select count(1) from
${currentDb}.${table1} where id=2"""
+ def del800 = sql """select count(1) from
${currentDb}.${table1} where id=800"""
+ def ins1010 = sql """select count(1) from
${currentDb}.${table1} where id=${totalRows + 10}"""
+ def v1 = upd1.size() == 0 ? null : upd1.get(0).get(0)
+ def v999 = upd999.size() == 0 ? null :
upd999.get(0).get(0)
+ log.info("incr cnt=${cnt} v1=${v1} v999=${v999}
del2=${del2} del800=${del800} ins1010=${ins1010}")
+ cnt.get(0).get(0) == expectedRows &&
+ v1 != null && v1.toString() == '99' &&
+ v999 != null && v999.toString() == '99' &&
+ del2.get(0).get(0) == 0 &&
+ del800.get(0).get(0) == 0 &&
+ ins1010.get(0).get(0) == 1
+ }
+ )
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex
+ }
+
+ def showUnrelated = sql """show tables from ${currentDb} like
'${unrelated}'"""
+ assert showUnrelated.size() == 0
+
+ qt_select_count """select count(1) from ${currentDb}.${table1}"""
+ qt_select_updates """select id, version from ${currentDb}.${table1}
where id in (1, 100, 500, 999) order by id"""
+ qt_select_deletes """select count(1) from ${currentDb}.${table1} where
id in (2, 200, 800)"""
+ qt_select_inserts """select id, tag, version from
${currentDb}.${table1} where id > ${totalRows} order by id"""
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+ def jobCountRsp = sql """select count(1) from jobs("type"="insert")
where Name ='${jobName}'"""
+ assert jobCountRsp.get(0).get(0) == 0
+ }
+}
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_async_split_pause_resume.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_async_split_pause_resume.groovy
index c367f4646e2..b367e8fefb7 100644
---
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_async_split_pause_resume.groovy
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_async_split_pause_resume.groovy
@@ -114,19 +114,45 @@
suite("test_streaming_postgres_job_async_split_pause_resume",
throw ex
}
- // Capture state, sleep, recapture — succeedTaskCount must not grow
while paused.
- def succeedAtPause = sql("""select SucceedTaskCount from
jobs("type"="insert") where Name='${jobName}'""")
- .get(0).get(0).toString()
- def rowsAtPause = sql("""SELECT COUNT(*) FROM
${currentDb}.${table1}""").get(0).get(0) as int
- sleep(15000)
+ // PAUSE is best-effort on the FE side: it stops new chunk dispatch,
but any chunk
+ // already in flight on cdc_client keeps running until its stream load
commits. Wait
+ // for both SucceedTaskCount and row count to stay flat across a
window that is wider
+ // than the post-pause observation sleep below — two equal samples a
few seconds apart
+ // are not a strong enough signal: a slow chunk could commit after the
would-be settle
+ // and trip the row-growth assertion in the next step.
+ long lastRows = -1L
+ String lastSucceed = ""
+ int stableCount = 0
+ final int requiredStable = 4
+ Awaitility.await().atMost(120, SECONDS).pollInterval(4,
SECONDS).until({
+ long curRows = sql("""SELECT COUNT(*) FROM
${currentDb}.${table1}""").get(0).get(0) as long
+ String curSucceed = sql("""select SucceedTaskCount from
jobs("type"="insert") where Name='${jobName}'""")
+ .get(0).get(0).toString()
+ if (curRows == lastRows && curSucceed == lastSucceed) {
+ stableCount++
+ } else {
+ stableCount = 1
+ lastRows = curRows
+ lastSucceed = curSucceed
+ }
+ log.info("pause-settle: rows=${curRows} succeed=${curSucceed}
stable=${stableCount}/${requiredStable}")
+ stableCount >= requiredStable
+ })
+
+ // Capture state, sleep, recapture — succeedTaskCount must not grow
while paused,
+ // and row count must not grow once the in-flight chunk has settled.
The sleep below
+ // is intentionally shorter than the stability window above.
+ def succeedAtPause = lastSucceed
+ def rowsAtPause = lastRows
+ sleep(10000)
def succeedAfterSleep = sql("""select SucceedTaskCount from
jobs("type"="insert") where Name='${jobName}'""")
.get(0).get(0).toString()
- def rowsAfterSleep = sql("""SELECT COUNT(*) FROM
${currentDb}.${table1}""").get(0).get(0) as int
+ def rowsAfterSleep = sql("""SELECT COUNT(*) FROM
${currentDb}.${table1}""").get(0).get(0) as long
log.info("paused: succeed ${succeedAtPause}->${succeedAfterSleep} rows
${rowsAtPause}->${rowsAfterSleep}")
assert succeedAfterSleep == succeedAtPause :
"SucceedTaskCount grew while paused (${succeedAtPause} ->
${succeedAfterSleep}) — splitter not stopped"
assert rowsAfterSleep == rowsAtPause :
- "row count grew while paused (${rowsAtPause} ->
${rowsAfterSleep}) — tasks still running"
+ "row count grew after PAUSE settled (${rowsAtPause} ->
${rowsAfterSleep}) — new tasks dispatched while paused"
def pausedStatus = sql """select status from jobs("type"="insert")
where Name='${jobName}'"""
assert pausedStatus.get(0).get(0) == "PAUSED" : "job didn't stay
PAUSED"
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_composite_pk.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_composite_pk.groovy
new file mode 100644
index 00000000000..80e580e91b8
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_composite_pk.groovy
@@ -0,0 +1,198 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_postgres_job_composite_pk",
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+ def jobName = "test_streaming_postgres_job_composite_pk_name"
+ def currentDb = (sql "select database()")[0][0]
+ def table1 = "streaming_composite_pk_pg"
+ def table2 = "streaming_full_pk_map_pg"
+ def pgDB = "postgres"
+ def pgSchema = "cdc_test"
+ def pgUser = "postgres"
+ def pgPassword = "123456"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${table1} force"""
+ sql """drop table if exists ${currentDb}.${table2} force"""
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String pg_port = context.config.otherConfigs.get("pg_14_port");
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar"
+
+ // ===== Prepare PG side =====
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}"""
+ sql """
+ create table ${pgDB}.${pgSchema}.${table1} (
+ org_id integer NOT NULL,
+ item_id bigint NOT NULL,
+ item_no varchar(64),
+ qty integer,
+ PRIMARY KEY (org_id, item_id)
+ );
+ """
+
+ // Snapshot rows: 2 orgs x 5 items each.
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (1, 1001,
'A_001', 10)"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (1, 1002,
'A_002', 20)"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (1, 1003,
'A_003', 30)"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (1, 1004,
'A_004', 40)"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (1, 1005,
'A_005', 50)"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (2, 2001,
'B_001', 100)"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (2, 2002,
'B_002', 200)"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (2, 2003,
'B_003', 300)"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (2, 2004,
'B_004', 400)"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (2, 2005,
'B_005', 500)"""
+
+ sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table2}"""
+ sql """
+ create table ${pgDB}.${pgSchema}.${table2} (
+ user_id integer NOT NULL,
+ role_id integer NOT NULL,
+ PRIMARY KEY (user_id, role_id)
+ );
+ """
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table2} VALUES (10,
100)"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table2} VALUES (10,
101)"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table2} VALUES (10,
102)"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table2} VALUES (20,
200)"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table2} VALUES (20,
201)"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table2} VALUES (20,
202)"""
+ }
+
+ // snapshot_split_size=3 -> chunks cross the org boundary.
+ sql """CREATE JOB ${jobName}
+ ON STREAMING
+ FROM POSTGRES (
+ "jdbc_url" =
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "org.postgresql.Driver",
+ "user" = "${pgUser}",
+ "password" = "${pgPassword}",
+ "database" = "${pgDB}",
+ "schema" = "${pgSchema}",
+ "include_tables" = "${table1},${table2}",
+ "offset" = "initial",
+ "snapshot_split_size" = "3"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )
+ """
+
+ try {
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(2, SECONDS).until(
+ {
+ def cnt1 = sql """select count(1) from
${currentDb}.${table1}"""
+ def cnt2 = sql """select count(1) from
${currentDb}.${table2}"""
+ log.info("snapshot row count table1=${cnt1}
table2=${cnt2}")
+ cnt1.get(0).get(0) == 10 && cnt2.get(0).get(0) == 6
+ }
+ )
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex
+ }
+
+ def showTbl = sql """show create table ${currentDb}.${table1}"""
+ def createInfo = showTbl[0][1]
+ log.info("create table: " + createInfo)
+ assert createInfo.contains("UNIQUE KEY(`org_id`, `item_id`)")
+
+ def showTbl2 = sql """show create table ${currentDb}.${table2}"""
+ def createInfo2 = showTbl2[0][1]
+ log.info("create table2: " + createInfo2)
+ assert createInfo2.contains("UNIQUE KEY(`user_id`, `role_id`)")
+
+ qt_desc_composite_pk """desc ${currentDb}.${table1};"""
+ qt_select_snapshot """select org_id, item_id, item_no, qty from
${currentDb}.${table1} order by org_id, item_id;"""
+ qt_select_snapshot_map """select user_id, role_id from
${currentDb}.${table2} order by user_id, role_id;"""
+
+ // ===== Binlog phase: INSERT / UPDATE / DELETE that all require
composite-PK locating =====
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (1, 1006,
'A_006', 60)"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (2, 2006,
'B_006', 600)"""
+
+ sql """UPDATE ${pgDB}.${pgSchema}.${table1} SET qty=999 WHERE
org_id=1 AND item_id=1003"""
+ sql """UPDATE ${pgDB}.${pgSchema}.${table1} SET qty=888 WHERE
org_id=2 AND item_id=2002"""
+
+ sql """DELETE FROM ${pgDB}.${pgSchema}.${table1} WHERE org_id=1
AND item_id=1001"""
+ sql """DELETE FROM ${pgDB}.${pgSchema}.${table1} WHERE org_id=2
AND item_id=2005"""
+
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table2} VALUES (10,
103)"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table2} VALUES (30,
300)"""
+ sql """DELETE FROM ${pgDB}.${pgSchema}.${table2} WHERE user_id=10
AND role_id=100"""
+ sql """DELETE FROM ${pgDB}.${pgSchema}.${table2} WHERE user_id=20
AND role_id=202"""
+ }
+
+ try {
+ Awaitility.await().atMost(180, SECONDS)
+ .pollInterval(2, SECONDS).until(
+ {
+ def cnt = sql """select count(1) from
${currentDb}.${table1}"""
+ def cntMap = sql """select count(1) from
${currentDb}.${table2}"""
+ def upd1 = sql """select qty from
${currentDb}.${table1} where org_id=1 and item_id=1003"""
+ def upd2 = sql """select qty from
${currentDb}.${table1} where org_id=2 and item_id=2002"""
+ def del1 = sql """select count(1) from
${currentDb}.${table1} where org_id=1 and item_id=1001"""
+ def del2 = sql """select count(1) from
${currentDb}.${table1} where org_id=2 and item_id=2005"""
+ def mapNew = sql """select count(1) from
${currentDb}.${table2} where user_id=30 and role_id=300"""
+ def mapDel = sql """select count(1) from
${currentDb}.${table2} where user_id=10 and role_id=100"""
+ def q1 = upd1.size() == 0 ? null : upd1.get(0).get(0)
+ def q2 = upd2.size() == 0 ? null : upd2.get(0).get(0)
+ log.info("incr count=" + cnt + " map.count=" + cntMap
+ " upd1.qty=" + q1 + " upd2.qty=" + q2
+ + " del1.exists=" + del1 + " del2.exists=" +
del2
+ + " mapNew=" + mapNew + " mapDel=" + mapDel)
+ cnt.get(0).get(0) == 10 &&
+ cntMap.get(0).get(0) == 6 &&
+ q1 != null && q1.toString() == '999' &&
+ q2 != null && q2.toString() == '888' &&
+ del1.get(0).get(0) == 0 &&
+ del2.get(0).get(0) == 0 &&
+ mapNew.get(0).get(0) == 1 &&
+ mapDel.get(0).get(0) == 0
+ }
+ )
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job (incr): " + showjob)
+ log.info("show task (incr): " + showtask)
+ throw ex
+ }
+
+ qt_select_after_incr """select org_id, item_id, item_no, qty from
${currentDb}.${table1} order by org_id, item_id;"""
+ qt_select_after_incr_map """select user_id, role_id from
${currentDb}.${table2} order by user_id, role_id;"""
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+ def jobCountRsp = sql """select count(1) from jobs("type"="insert")
where Name ='${jobName}'"""
+ assert jobCountRsp.get(0).get(0) == 0
+ }
+}
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_decimal_pk.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_decimal_pk.groovy
new file mode 100644
index 00000000000..17ef08a469d
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_decimal_pk.groovy
@@ -0,0 +1,130 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_postgres_job_decimal_pk",
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+ def jobName = "test_streaming_postgres_job_decimal_pk_name"
+ def currentDb = (sql "select database()")[0][0]
+ def table1 = "streaming_decimal_pk_pg"
+ def pgDB = "postgres"
+ def pgSchema = "cdc_test"
+ def pgUser = "postgres"
+ def pgPassword = "123456"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${table1} force"""
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String pg_port = context.config.otherConfigs.get("pg_14_port");
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar"
+
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}"""
+ sql """
+ create table ${pgDB}.${pgSchema}.${table1} (
+ amount numeric(20, 4) NOT NULL,
+ payload varchar(64),
+ PRIMARY KEY (amount)
+ );
+ """
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (1.0000,
'A1')"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (2.5000,
'B1')"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (3.7500,
'C1')"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (4.1234,
'D1')"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES
(9999999999999.9999, 'E1')"""
+ }
+
+ sql """CREATE JOB ${jobName}
+ ON STREAMING
+ FROM POSTGRES (
+ "jdbc_url" =
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "org.postgresql.Driver",
+ "user" = "${pgUser}",
+ "password" = "${pgPassword}",
+ "database" = "${pgDB}",
+ "schema" = "${pgSchema}",
+ "include_tables" = "${table1}",
+ "offset" = "initial",
+ "snapshot_split_size" = "2"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )
+ """
+
+ try {
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(2, SECONDS).until(
+ {
+ def cnt = sql """select count(1) from
${currentDb}.${table1}"""
+ log.info("snapshot row count: " + cnt)
+ cnt.get(0).get(0) == 5
+ }
+ )
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex
+ }
+
+ qt_select_snapshot """select amount, payload from
${currentDb}.${table1} order by amount asc"""
+
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (5.5555,
'F2')"""
+ sql """UPDATE ${pgDB}.${pgSchema}.${table1} SET payload='B2_upd'
WHERE amount=2.5000"""
+ sql """DELETE FROM ${pgDB}.${pgSchema}.${table1} WHERE
amount=4.1234"""
+ }
+
+ try {
+ Awaitility.await().atMost(180, SECONDS)
+ .pollInterval(2, SECONDS).until(
+ {
+ def cnt = sql """select count(1) from
${currentDb}.${table1}"""
+ def upd = sql """select payload from
${currentDb}.${table1} where amount=2.5000"""
+ def del = sql """select count(1) from
${currentDb}.${table1} where amount=4.1234"""
+ def p = upd.size() == 0 ? null : upd.get(0).get(0)
+ log.info("incr cnt=${cnt} upd.payload=${p}
del.exists=${del}")
+ cnt.get(0).get(0) == 5 && p == 'B2_upd' &&
del.get(0).get(0) == 0
+ }
+ )
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job (incr): " + showjob)
+ log.info("show task (incr): " + showtask)
+ throw ex
+ }
+
+ qt_select_after_incr """select amount, payload from
${currentDb}.${table1} order by amount asc"""
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+ def jobCountRsp = sql """select count(1) from jobs("type"="insert")
where Name ='${jobName}'"""
+ assert jobCountRsp.get(0).get(0) == 0
+ }
+}
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_id_gap_completeness.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_id_gap_completeness.groovy
new file mode 100644
index 00000000000..c4c913100b9
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_id_gap_completeness.groovy
@@ -0,0 +1,146 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_postgres_job_id_gap_completeness",
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+ def jobName = "test_streaming_postgres_job_id_gap_completeness_name"
+ def currentDb = (sql "select database()")[0][0]
+ def table1 = "streaming_id_gap_pg"
+ def pgDB = "postgres"
+ def pgSchema = "cdc_test"
+ def pgUser = "postgres"
+ def pgPassword = "123456"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${table1} force"""
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String pg_port = context.config.otherConfigs.get("pg_14_port");
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar"
+
+ // ===== Prepare PG side =====
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}"""
+ sql """
+ create table ${pgDB}.${pgSchema}.${table1} (
+ id bigint PRIMARY KEY,
+ tag varchar(64),
+ version integer
+ );
+ """
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (id, tag, version)
+ SELECT g, 'dense', 0 FROM generate_series(1, 100) g"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (id, tag,
version) VALUES (10000000, 'outlier', 0)"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (id, tag, version)
+ SELECT g, 'post_outlier', 0 FROM generate_series(10000001,
10000100) g"""
+ }
+
+ sql """CREATE JOB ${jobName}
+ ON STREAMING
+ FROM POSTGRES (
+ "jdbc_url" =
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "org.postgresql.Driver",
+ "user" = "${pgUser}",
+ "password" = "${pgPassword}",
+ "database" = "${pgDB}",
+ "schema" = "${pgSchema}",
+ "include_tables" = "${table1}",
+ "offset" = "initial",
+ "snapshot_split_size" = "20"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )
+ """
+
+ try {
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(2, SECONDS).until(
+ {
+ def cnt = sql """select count(1) from
${currentDb}.${table1}"""
+ log.info("snapshot row count: " + cnt)
+ cnt.get(0).get(0) == 201
+ }
+ )
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex
+ }
+
+ def distinctCount = sql """SELECT COUNT(DISTINCT id) FROM
${currentDb}.${table1}"""
+ assert distinctCount.get(0).get(0) == 201
+ def outlierExists = sql """SELECT COUNT(1) FROM ${currentDb}.${table1}
WHERE id=10000000"""
+ assert outlierExists.get(0).get(0) == 1
+ def postOutlierCount = sql """SELECT COUNT(1) FROM
${currentDb}.${table1} WHERE id BETWEEN 10000001 AND 10000100"""
+ assert postOutlierCount.get(0).get(0) == 100
+
+ qt_select_snapshot_count """select count(1) from
${currentDb}.${table1}"""
+ qt_select_snapshot_dense_sample """select id, tag from
${currentDb}.${table1} where id in (1, 50, 100) order by id"""
+ qt_select_snapshot_outlier_sample """select id, tag from
${currentDb}.${table1} where id in (10000000, 10000050, 10000100) order by id"""
+
+ // ===== Binlog phase: cover dense + outlier + post-outlier ranges
=====
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (id, tag,
version) VALUES (10000200, 'incr_outlier', 1)"""
+ sql """UPDATE ${pgDB}.${pgSchema}.${table1} SET version=99 WHERE
id=50"""
+ sql """DELETE FROM ${pgDB}.${pgSchema}.${table1} WHERE
id=10000000"""
+ }
+
+ try {
+ Awaitility.await().atMost(180, SECONDS)
+ .pollInterval(2, SECONDS).until(
+ {
+ def cnt = sql """select count(1) from
${currentDb}.${table1}"""
+ def upd = sql """select version from
${currentDb}.${table1} where id=50"""
+ def del = sql """select count(1) from
${currentDb}.${table1} where id=10000000"""
+ def ins = sql """select count(1) from
${currentDb}.${table1} where id=10000200"""
+ def v = upd.size() == 0 ? null : upd.get(0).get(0)
+ log.info("incr cnt=${cnt} id50.version=${v}
id10000000.exists=${del} id10000200.exists=${ins}")
+ cnt.get(0).get(0) == 201 &&
+ v != null && v.toString() == '99' &&
+ del.get(0).get(0) == 0 &&
+ ins.get(0).get(0) == 1
+ }
+ )
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job (incr): " + showjob)
+ log.info("show task (incr): " + showtask)
+ throw ex
+ }
+
+ qt_select_after_incr_count """select count(1) from
${currentDb}.${table1}"""
+ qt_select_after_incr_changed """select id, tag, version from
${currentDb}.${table1} where id in (50, 10000200) order by id"""
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+ def jobCountRsp = sql """select count(1) from jobs("type"="insert")
where Name ='${jobName}'"""
+ assert jobCountRsp.get(0).get(0) == 0
+ }
+}
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_with_concurrent_dml.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_with_concurrent_dml.groovy
new file mode 100644
index 00000000000..1694862248a
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_with_concurrent_dml.groovy
@@ -0,0 +1,153 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_postgres_job_snapshot_with_concurrent_dml",
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+ def jobName =
"test_streaming_postgres_job_snapshot_with_concurrent_dml_name"
+ def currentDb = (sql "select database()")[0][0]
+ def table1 = "streaming_snapshot_dml_pg"
+ def unrelated = "streaming_snapshot_dml_unrelated_pg"
+ def pgDB = "postgres"
+ def pgSchema = "cdc_test"
+ def pgUser = "postgres"
+ def pgPassword = "123456"
+ def totalRows = 1000
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${table1} force"""
+ sql """drop table if exists ${currentDb}.${unrelated} force"""
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String pg_port = context.config.otherConfigs.get("pg_14_port");
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar"
+
+ // ===== Prepare PG side =====
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}"""
+ sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${unrelated}"""
+ sql """
+ create table ${pgDB}.${pgSchema}.${table1} (
+ id integer PRIMARY KEY,
+ tag varchar(64),
+ version integer
+ );
+ """
+ // 1000 snapshot rows via generate_series.
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (id, tag, version)
+ SELECT g, 'snap', 0 FROM generate_series(1, ${totalRows})
g"""
+
+ sql """
+ create table ${pgDB}.${pgSchema}.${unrelated} (
+ id integer PRIMARY KEY,
+ tag varchar(64)
+ );
+ """
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${unrelated} (id, tag)
VALUES (1, 'pre_snap')"""
+ }
+
+ // snapshot_split_size=10 + snapshot_parallelism=1 -> 100 serial
splits, slow enough that
+ // the concurrent DML below actually overlaps with snapshot.
+ sql """CREATE JOB ${jobName}
+ ON STREAMING
+ FROM POSTGRES (
+ "jdbc_url" =
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "org.postgresql.Driver",
+ "user" = "${pgUser}",
+ "password" = "${pgPassword}",
+ "database" = "${pgDB}",
+ "schema" = "${pgSchema}",
+ "include_tables" = "${table1}",
+ "offset" = "initial",
+ "snapshot_split_size" = "10",
+ "snapshot_parallelism" = "1"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )
+ """
+
+ // Concurrent DML on source while cdc-client is still snapshotting.
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ // 10 INSERTs with id > snapshot range
+ for (int i = 1; i <= 10; i++) {
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (id, tag,
version) VALUES (${totalRows + i}, 'concurrent_ins', 1)"""
+ }
+ // 4 UPDATEs spanning the chunk range
+ sql """UPDATE ${pgDB}.${pgSchema}.${table1} SET version=99 WHERE
id IN (1, 100, 500, 999)"""
+ // 3 DELETEs spanning the chunk range
+ sql """DELETE FROM ${pgDB}.${pgSchema}.${table1} WHERE id IN (2,
200, 800)"""
+
+ // DML on unrelated table - must NOT leak into Doris (not in
include_tables).
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${unrelated} (id, tag)
VALUES (2, 'concurrent_unrelated_ins')"""
+ sql """UPDATE ${pgDB}.${pgSchema}.${unrelated} SET
tag='concurrent_unrelated_upd' WHERE id=1"""
+ }
+
+ // Wait until final state matches source: 1000 + 10 inserts - 3
deletes = 1007 rows.
+ def expectedRows = totalRows + 10 - 3
+ try {
+ Awaitility.await().atMost(600, SECONDS)
+ .pollInterval(2, SECONDS).until(
+ {
+ def cnt = sql """select count(1) from
${currentDb}.${table1}"""
+ def upd1 = sql """select version from
${currentDb}.${table1} where id=1"""
+ def upd999 = sql """select version from
${currentDb}.${table1} where id=999"""
+ def del2 = sql """select count(1) from
${currentDb}.${table1} where id=2"""
+ def del800 = sql """select count(1) from
${currentDb}.${table1} where id=800"""
+ def ins1010 = sql """select count(1) from
${currentDb}.${table1} where id=${totalRows + 10}"""
+ def v1 = upd1.size() == 0 ? null : upd1.get(0).get(0)
+ def v999 = upd999.size() == 0 ? null :
upd999.get(0).get(0)
+ log.info("incr cnt=${cnt} v1=${v1} v999=${v999}
del2=${del2} del800=${del800} ins1010=${ins1010}")
+ cnt.get(0).get(0) == expectedRows &&
+ v1 != null && v1.toString() == '99' &&
+ v999 != null && v999.toString() == '99' &&
+ del2.get(0).get(0) == 0 &&
+ del800.get(0).get(0) == 0 &&
+ ins1010.get(0).get(0) == 1
+ }
+ )
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex
+ }
+
+ // unrelated table must NOT have been synced to Doris.
+ def showUnrelated = sql """show tables from ${currentDb} like
'${unrelated}'"""
+ assert showUnrelated.size() == 0
+
+ qt_select_count """select count(1) from ${currentDb}.${table1}"""
+ qt_select_updates """select id, version from ${currentDb}.${table1}
where id in (1, 100, 500, 999) order by id"""
+ qt_select_deletes """select count(1) from ${currentDb}.${table1} where
id in (2, 200, 800)"""
+ qt_select_inserts """select id, tag, version from
${currentDb}.${table1} where id > ${totalRows} order by id"""
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+ def jobCountRsp = sql """select count(1) from jobs("type"="insert")
where Name ='${jobName}'"""
+ assert jobCountRsp.get(0).get(0) == 0
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]