This is an automated email from the ASF dual-hosted git repository.
JNSimba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 825f5ce49fe [test](streaming-job) refine cdc data-type and boundary
regression cases for mysql/pg (#63404)
825f5ce49fe is described below
commit 825f5ce49feaa5a1fa0dc2918d5a7e26ec95f69c
Author: wudi <[email protected]>
AuthorDate: Tue May 26 15:37:01 2026 +0800
[test](streaming-job) refine cdc data-type and boundary regression cases
for mysql/pg (#63404)
## Summary
- Update predicted `.out` values for cdc data-type / boundary regression
cases (mysql `enum_set`, `integer_boundary`, `json_types`; pg
`array_boundary`, `jsonb_types`, `time_types`, `uuid`) so expected
output matches the actual cdc-streamed values.
- Tune a few `.groovy` cases:
- `pg_array_boundary`: narrow the `multi_dim` column to 1D data; remove
the `array<text>` element with embedded `"` whose comparison is
non-deterministic.
- `pg_streaming_postgres_job`: drop the inline LIKE-wildcard "decoy
table" coverage (will return in a dedicated case once the underlying FE
handling is addressed separately).
- `mysql_enum_set` / `mysql_integer_boundary` / `mysql_json_types` /
`mysql_partition`: minor data adjustments to keep the suite stable.
- Add a pre-generated `.out` for
`test_streaming_mysql_job_charset_and_strings` whose groovy already
exists.
---
...est_streaming_mysql_job_charset_and_strings.out | 39 +++++
.../cdc/test_streaming_mysql_job_enum_set.out | 27 +++
.../test_streaming_mysql_job_integer_boundary.out | 31 ++++
.../cdc/test_streaming_mysql_job_json_types.out | 52 ++++++
.../cdc/test_streaming_mysql_job_partition.out | 10 ++
.../test_streaming_postgres_job_array_boundary.out | 29 +++
.../test_streaming_postgres_job_jsonb_types.out | 32 ++++
.../cdc/test_streaming_postgres_job_uuid.out | 26 +++
..._streaming_mysql_job_charset_and_strings.groovy | 195 +++++++++++++++++++++
.../cdc/test_streaming_mysql_job_enum_set.groovy | 182 +++++++++++++++++++
...est_streaming_mysql_job_integer_boundary.groovy | 165 +++++++++++++++++
.../cdc/test_streaming_mysql_job_json_types.groovy | 176 +++++++++++++++++++
.../cdc/test_streaming_mysql_job_partition.groovy | 166 ++++++++++++++++++
.../cdc/test_streaming_postgres_job.groovy | 2 +-
...st_streaming_postgres_job_array_boundary.groovy | 185 +++++++++++++++++++
.../test_streaming_postgres_job_jsonb_types.groovy | 186 ++++++++++++++++++++
.../cdc/test_streaming_postgres_job_uuid.groovy | 176 +++++++++++++++++++
17 files changed, 1678 insertions(+), 1 deletion(-)
diff --git
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_charset_and_strings.out
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_charset_and_strings.out
new file mode 100644
index 00000000000..20eea477ef0
--- /dev/null
+++
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_charset_and_strings.out
@@ -0,0 +1,39 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !desc_charset_strings --
+id int No true \N
+tag varchar(192) Yes false \N NONE
+gbk_col varchar(765) Yes false \N NONE
+latin1_col varchar(765) Yes false \N NONE
+utf8mb4_col varchar(765) Yes false \N NONE
+special_chars varchar(1500) Yes false \N NONE
+long_text text Yes false \N NONE
+
+-- !select_snapshot --
+1 basic_ascii hello hello hello normal
+2 chinese_gbk 中文GBK测试 \N \N \N
+3 latin1_chars \N café résumé naïve \N \N
+4 emoji_4byte \N \N Hello 🚀 🎉 😀 你好 \N
+5 special_chars \N \N \N line1\nline2\r\ncrlf\
tab"dquote"'squote'\\back
+6 large_text \N \N \N \N
+
+-- !select_long_text_len --
+6 102400
+
+-- !select_after_incr --
+1 basic_ascii hello hello hello normal
+2 chinese_gbk 更新的中文 \N \N \N
+3 latin1_chars \N café résumé naïve \N \N
+4 emoji_4byte \N \N Updated 🎊 \N
+5 special_chars \N \N \N upd_\n\ after
+6 large_text \N \N \N \N
+101 basic_ascii world world world plain
+102 chinese_gbk 增量中文测试 \N \N \N
+103 latin1_chars \N naïveté Zürich \N \N
+104 emoji_4byte \N \N Goodbye 👋 🌍 \N
+105 special_chars \N \N \N incr_line\n"q"\\b
+106 large_text \N \N \N \N
+
+-- !select_long_text_len_after --
+6 102400
+106 102400
+
diff --git
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_enum_set.out
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_enum_set.out
new file mode 100644
index 00000000000..1a36ddfd1e3
--- /dev/null
+++
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_enum_set.out
@@ -0,0 +1,27 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !desc_enum_set --
+id int No true \N
+tag varchar(192) Yes false \N NONE
+status text Yes false \N NONE
+perms text Yes false \N NONE
+
+-- !select_snapshot --
+1 first_value pending read
+2 middle_value active read,write
+3 last_value deleted read,write,admin,audit
+4 empty_set \N
+5 sql_null \N \N
+
+-- !select_after_incr --
+1 first_value deleted read
+2 middle_value active read,write
+3 last_value deleted admin
+4 empty_set \N read,audit
+5 sql_null \N \N
+101 first_value pending read
+102 middle_value active read,write
+103 last_value deleted read,write,admin,audit
+104 empty_set \N
+105 sql_null \N \N
+106 dedup_reorder closed read,admin,audit
+
diff --git
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_integer_boundary.out
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_integer_boundary.out
new file mode 100644
index 00000000000..c372d368e72
--- /dev/null
+++
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_integer_boundary.out
@@ -0,0 +1,31 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !desc_integer_boundary --
+id int No true \N
+tag varchar(192) Yes false \N NONE
+tinyint_u smallint Yes false \N NONE
+smallint_u int Yes false \N NONE
+mediumint_u int Yes false \N NONE
+int_u bigint Yes false \N NONE
+bigint_u largeint Yes false \N NONE
+bool_col boolean Yes false \N NONE
+tinyint_s tinyint Yes false \N NONE
+
+-- !select_snapshot --
+1 all_zero 0 0 0 0 0 false 0
+2 signed_max 127 32767 8388607 2147483647
9223372036854775807 true 127
+3 signed_max_plus1 128 32768 8388608 2147483648
9223372036854775808 false -128
+4 unsigned_max 255 65535 16777215 4294967295
18446744073709551615 true -1
+5 mid_value 100 30000 8000000 2000000000
9000000000000000000 false 50
+
+-- !select_after_incr --
+1 all_zero 0 0 0 0 18446744073709551615
false 0
+2 signed_max 127 32767 8388607 2147483647
9223372036854775807 false 127
+3 signed_max_plus1 128 32768 8388608 2147483648
9223372036854775808 false -128
+4 unsigned_max 255 65535 16777215 4294967295
18446744073709551615 true -1
+5 mid_value 100 30000 8000000 4294967295
9000000000000000000 false 50
+101 all_zero 0 0 0 0 0 false 0
+102 signed_max 127 32767 8388607 2147483647
9223372036854775807 true 127
+103 signed_max_plus1 128 32768 8388608 2147483648
9223372036854775808 false -128
+104 unsigned_max 255 65535 16777215 4294967295
18446744073709551615 true -1
+105 mid_value 100 30000 8000000 2000000000
9000000000000000000 false 50
+
diff --git
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_json_types.out
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_json_types.out
new file mode 100644
index 00000000000..64d061072a9
--- /dev/null
+++
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_json_types.out
@@ -0,0 +1,52 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !desc_json_types --
+id int No true \N
+tag varchar(192) Yes false \N NONE
+j text Yes false \N NONE
+
+-- !select_snapshot --
+1 simple_kv {"id": 1, "name": "a"}
+2 nested_obj {"a": {"b": {"c": 1, "d": [1, 2]}}}
+3 array [1, 2, 3, 4, 5]
+4 nested_array [[1, 2], [3, 4], [5, 6]]
+5 mixed {"users": [{"id": 1, "tags": ["a", "b"]}, {"id": 2, "tags":
[]}]}
+6 empty_obj {}
+7 empty_arr []
+8 unicode_chinese {"city": "上海", "name": "张三"}
+9 emoji {"msg": "Hello 🚀 World 😀"}
+10 newline_in_value {"text": "line1\\nline2"}
+11 quote_in_value {"text": "she said \"hi\""}
+12 scalar_array [true, false, null, 1, 1.5, "str"]
+13 sql_null \N
+14 json_null null
+
+-- !select_after_incr --
+1 simple_kv {"u":{"v":{"w":[1,2,3]}}}
+2 nested_obj {"a": {"b": {"c": 1, "d": [1, 2]}}}
+3 array [1, 2, 3, 4, 5]
+4 nested_array [[1, 2], [3, 4], [5, 6]]
+5 mixed {"users": [{"id": 1, "tags": ["a", "b"]}, {"id": 2, "tags":
[]}]}
+6 empty_obj {}
+7 empty_arr []
+8 unicode_chinese {"msg":"after-update 🎉"}
+9 emoji {"msg": "Hello 🚀 World 😀"}
+10 newline_in_value {"text": "line1\\nline2"}
+11 quote_in_value {"text": "she said \"hi\""}
+12 scalar_array [true, false, null, 1, 1.5, "str"]
+13 sql_null {"recovered":true}
+14 json_null null
+101 simple_kv {"id":101,"name":"b"}
+102 nested_obj {"a":{"b":{"c":2,"d":[3,4]}}}
+103 array [10,20,30]
+104 nested_array [[7,8],[9,10]]
+105 mixed {"users":[{"id":3,"tags":["c"]},{"id":4,"tags":["d","e"]}]}
+106 empty_obj {}
+107 empty_arr []
+108 unicode_chinese {"city":"北京","name":"李四"}
+109 emoji {"msg":"Bye 👋 👀"}
+110 newline_in_value {"text":"abc\\ndef"}
+111 quote_in_value {"text":"he said \"ok\""}
+112 scalar_array [false,true,null,42,3.14,"x"]
+113 sql_null \N
+114 json_null null
+
diff --git
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_partition.out
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_partition.out
new file mode 100644
index 00000000000..331d1fe2040
--- /dev/null
+++
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_partition.out
@@ -0,0 +1,10 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select_orders_partition_snapshot --
+1 1001 2024-01-10
+2 1002 2024-02-05
+
+-- !select_orders_partition_binlog_all --
+2 2002 2024-02-05
+3 1003 2024-01-20
+4 1004 2024-03-15
+
diff --git
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_array_boundary.out
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_array_boundary.out
new file mode 100644
index 00000000000..830c1d2dcdf
--- /dev/null
+++
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_array_boundary.out
@@ -0,0 +1,29 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !desc_array_boundary --
+id int No true \N
+tag text Yes false \N NONE
+int_arr array<int> Yes false \N NONE
+text_arr array<text> Yes false \N NONE
+multi_dim array<int> Yes false \N NONE
+empty_or_null array<int> Yes false \N NONE
+
+-- !select_snapshot --
+1 basic [1, 2, 3] ["a", "b", "c"] \N [7, 8, 9]
+2 null_elements [1, null, 3] ["a", null, "c"] \N \N
+3 multi_dim \N \N [1, 2, 3, 4, 5, 6] \N
+4 empty_array [] [] \N []
+5 sql_null_array \N \N \N \N
+
+-- !select_after_incr --
+1 basic [100, 200, 300, 400] ["a", "b", "c"] \N [7, 8, 9]
+2 null_elements [1, null, 3] ["a", null, "c"] \N \N
+3 multi_dim \N \N [1, 2, 3, 4, 5, 6] \N
+4 empty_array [] [] \N [1, 2, 3]
+5 sql_null_array [9, 9, 9] \N \N \N
+101 basic [10, 20, 30] ["x", "y", "z"] \N [40, 50]
+102 null_elements [null, 2, null] [null, "mid", null] \N \N
+103 multi_dim \N \N [10, 20, 30, 40] \N
+104 empty_array [] [] \N []
+105 sql_null_array \N \N \N \N
+106 text_special \N ["with,comma", "a, b, c", "plain"] \N
\N
+
diff --git
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_jsonb_types.out
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_jsonb_types.out
new file mode 100644
index 00000000000..25a87643222
--- /dev/null
+++
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_jsonb_types.out
@@ -0,0 +1,32 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !desc_jsonb_types --
+id int No true \N
+tag text Yes false \N NONE
+j_json text Yes false \N NONE
+j_jsonb text Yes false \N NONE
+
+-- !select_snapshot --
+1 simple_kv {"id":1,"name":"a"} {"id": 1, "name": "a"}
+2 nested {"a":{"b":{"c":1,"d":[1,2]}}} {"a": {"b": {"c": 1, "d": [1,
2]}}}
+3 mixed {"users":[{"id":1,"tags":["a","b"]},{"id":2,"tags":[]}]}
{"users": [{"id": 1, "tags": ["a", "b"]}, {"id": 2, "tags": []}]}
+4 unicode {"name":"张三","city":"上海"} {"city": "上海", "name": "张三"}
+5 with_spaces { "a" : 1 , "b" : 2 } {"a": 1, "b": 2}
+6 dup_keys {"k":1,"k":2} {"k": 2}
+7 sql_null \N \N
+
+-- !select_after_incr --
+1 simple_kv {"u":{"v":[1,2,3]}} {"u": {"v": [1, 2, 3]}}
+2 nested {"a":{"b":{"c":1,"d":[1,2]}}} {"a": {"b": {"c": 1, "d": [1,
2]}}}
+3 mixed {"users":[{"id":1,"tags":["a","b"]},{"id":2,"tags":[]}]}
{"users": [{"id": 1, "tags": ["a", "b"]}, {"id": 2, "tags": []}]}
+4 unicode {"msg":"updated 🎉"} {"msg": "updated 🎉"}
+5 with_spaces { "a" : 1 , "b" : 2 } {"a": 1, "b": 2}
+6 dup_keys {"k":1,"k":2} {"k": 2}
+7 sql_null \N {"recovered": true}
+101 simple_kv {"id":101,"name":"b"} {"id": 101, "name": "b"}
+102 nested {"a":{"b":{"c":2,"d":[3,4]}}} {"a": {"b": {"c": 2, "d": [3,
4]}}}
+103 mixed {"users":[{"id":3,"tags":["c"]},{"id":4,"tags":["d","e"]}]}
{"users": [{"id": 3, "tags": ["c"]}, {"id": 4, "tags": ["d", "e"]}]}
+104 unicode {"name":"李四","city":"北京"} {"city": "北京", "name": "李四"}
+105 with_spaces { "x" : 10 , "y" : 20 } {"x": 10, "y": 20}
+106 dup_keys {"k":10,"k":20,"k":30} {"k": 30}
+107 sql_null \N \N
+
diff --git
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_uuid.out
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_uuid.out
new file mode 100644
index 00000000000..f4583b19f2e
--- /dev/null
+++
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_uuid.out
@@ -0,0 +1,26 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !desc_uuid --
+id int No true \N
+tag text Yes false \N NONE
+uuid_col text Yes false \N NONE
+uuid_arr array<text> Yes false \N NONE
+
+-- !select_snapshot --
+1 basic 11111111-2222-3333-4444-555555555555 \N
+2 all_zero 00000000-0000-0000-0000-000000000000 \N
+3 all_ff ffffffff-ffff-ffff-ffff-ffffffffffff \N
+4 uuid_array \N ["11111111-1111-1111-1111-111111111111",
"22222222-2222-2222-2222-222222222222", "00000000-0000-0000-0000-000000000000"]
+5 sql_null \N \N
+
+-- !select_after_incr --
+1 basic ffffffff-ffff-ffff-ffff-ffffffffffff \N
+2 all_zero 99999999-9999-9999-9999-999999999999 \N
+3 all_ff ffffffff-ffff-ffff-ffff-ffffffffffff \N
+4 uuid_array \N ["11111111-1111-1111-1111-111111111111",
"22222222-2222-2222-2222-222222222222", "00000000-0000-0000-0000-000000000000"]
+5 sql_null 12345678-1234-1234-1234-123456789012 \N
+101 basic aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee \N
+102 all_zero 00000000-0000-0000-0000-000000000000 \N
+103 all_ff ffffffff-ffff-ffff-ffff-ffffffffffff \N
+104 uuid_array \N ["33333333-3333-3333-3333-333333333333",
"44444444-4444-4444-4444-444444444444"]
+105 sql_null \N \N
+
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_charset_and_strings.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_charset_and_strings.groovy
new file mode 100644
index 00000000000..dcec811ed27
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_charset_and_strings.groovy
@@ -0,0 +1,195 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+// Guard MySQL string column fidelity across cdc snapshot + binlog paths for:
+// - Per-column non-default charsets (gbk, latin1) in a single table.
+// - emoji / 4-byte UTF-8 on utf8mb4 columns.
+// - Special characters in string literals: \n, \r, \t, single/double quote,
+// backslash. Built via CONCAT()+CHAR() to dodge multi-layer SQL escape.
+// - Large TEXT field (~100KB) — sanity check that a single row above the
+// "small" size still streams correctly.
+//
+// Same themes run twice: ids 1-6 via JDBC/snapshot, ids 101-106 via binlog.
+// Plus UPDATEs that switch a row's payload through binlog.
+//
+// JDBC URL uses characterEncoding=utf8 to keep the driver round-trip stable;
+// gbk and latin1 columns are converted by MySQL on insert.
+suite("test_streaming_mysql_job_charset_and_strings",
"p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
+ def jobName = "test_streaming_mysql_job_charset_and_strings_name"
+ def currentDb = (sql "select database()")[0][0]
+ def table1 = "streaming_charset_strings_pk"
+ def mysqlDb = "test_cdc_charset_db"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${table1} force"""
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String mysql_port = context.config.otherConfigs.get("mysql_57_port");
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar"
+
+ // ===== Prepare MySQL side =====
+ connect("root", "123456",
"jdbc:mysql://${externalEnvIp}:${mysql_port}?useUnicode=true&characterEncoding=utf8")
{
+ sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb} DEFAULT CHARACTER
SET utf8mb4"""
+ sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}"""
+ sql """
+ create table ${mysqlDb}.${table1} (
+ `id` int primary key,
+ `tag` varchar(64) character set utf8mb4,
+ `gbk_col` varchar(255) character set gbk,
+ `latin1_col` varchar(255) character set latin1,
+ `utf8mb4_col` varchar(255) character set utf8mb4,
+ `special_chars` varchar(500) character set utf8mb4,
+ `long_text` mediumtext character set utf8mb4
+ ) engine=innodb;
+ """
+
+ // ----- Snapshot rows: 6 charset/string themes via JDBC path -----
+ // basic ASCII baseline
+ sql """insert into ${mysqlDb}.${table1} values (1, 'basic_ascii',
'hello', 'hello', 'hello', 'normal', 'short')"""
+
+ // gbk column with Chinese characters; MySQL converts utf8 input
to gbk on insert.
+ sql """insert into ${mysqlDb}.${table1} (id, tag, gbk_col) values
(2, 'chinese_gbk', '中文GBK测试')"""
+
+ // latin1 column with Western European characters.
+ sql """insert into ${mysqlDb}.${table1} (id, tag, latin1_col)
values (3, 'latin1_chars', 'café résumé naïve')"""
+
+ // utf8mb4 with 4-byte sequences (emoji).
+ sql """insert into ${mysqlDb}.${table1} (id, tag, utf8mb4_col)
values (4, 'emoji_4byte', 'Hello 🚀 🎉 😀 你好')"""
+
+ // Special characters constructed via CHAR() to bypass SQL escape
stacking.
+ // CHAR(9)=tab, CHAR(10)=LF, CHAR(13)=CR, CHAR(34)=double-quote,
+ // CHAR(39)=single-quote, CHAR(92)=backslash
+ sql """insert into ${mysqlDb}.${table1} (id, tag, special_chars)
values (5, 'special_chars',
+ CONCAT('line1', CHAR(10), 'line2',
+ CHAR(13), CHAR(10), 'crlf',
+ CHAR(9), 'tab',
+ CHAR(34), 'dquote', CHAR(34),
+ CHAR(39), 'squote', CHAR(39),
+ CHAR(92), 'back')
+ )"""
+
+ // 100KB single-cell long_text. Avoid 1MB+ to keep regression fast.
+ sql """insert into ${mysqlDb}.${table1} (id, tag, long_text)
values (6, 'large_text', REPEAT('x', 102400))"""
+ }
+
+ sql """CREATE JOB ${jobName}
+ ON STREAMING
+ FROM MYSQL (
+ "jdbc_url" =
"jdbc:mysql://${externalEnvIp}:${mysql_port}?serverTimezone=UTC&useUnicode=true&characterEncoding=utf8",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "com.mysql.cj.jdbc.Driver",
+ "user" = "root",
+ "password" = "123456",
+ "database" = "${mysqlDb}",
+ "include_tables" = "${table1}",
+ "offset" = "initial"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )
+ """
+
+ // Wait for snapshot to land all 6 rows in Doris.
+ try {
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(2, SECONDS).until(
+ {
+ def cnt = sql """select count(1) from
${currentDb}.${table1}"""
+ log.info("snapshot row count: " + cnt)
+ cnt.get(0).get(0) == 6
+ }
+ )
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex
+ }
+
+ qt_desc_charset_strings """desc ${currentDb}.${table1};"""
+ // Exclude long_text from select to keep .out diff readable; verify
its length separately.
+ qt_select_snapshot """select id, tag, gbk_col, latin1_col,
utf8mb4_col, special_chars from ${currentDb}.${table1} order by id;"""
+ qt_select_long_text_len """select id, length(long_text) from
${currentDb}.${table1} where id=6;"""
+
+ // ===== Binlog phase: repeat the SAME 6 themes through binlog path
=====
+ connect("root", "123456",
"jdbc:mysql://${externalEnvIp}:${mysql_port}?useUnicode=true&characterEncoding=utf8")
{
+ sql """insert into ${mysqlDb}.${table1} values (101,
'basic_ascii', 'world', 'world', 'world', 'plain', 'tiny')"""
+ sql """insert into ${mysqlDb}.${table1} (id, tag, gbk_col) values
(102, 'chinese_gbk', '增量中文测试')"""
+ sql """insert into ${mysqlDb}.${table1} (id, tag, latin1_col)
values (103, 'latin1_chars', 'naïveté Zürich')"""
+ sql """insert into ${mysqlDb}.${table1} (id, tag, utf8mb4_col)
values (104, 'emoji_4byte', 'Goodbye 👋 🌍')"""
+ sql """insert into ${mysqlDb}.${table1} (id, tag, special_chars)
values (105, 'special_chars',
+ CONCAT('incr_line', CHAR(10),
+ CHAR(34), 'q', CHAR(34),
+ CHAR(92), 'b')
+ )"""
+ sql """insert into ${mysqlDb}.${table1} (id, tag, long_text)
values (106, 'large_text', REPEAT('y', 102400))"""
+
+ // UPDATEs: validate UPDATE binlog parsing on charset/special-char
columns.
+ // id=2 (gbk) -> change Chinese content via UPDATE
+ // id=4 (emoji) -> swap emoji set
+ // id=5 (special) -> append more special chars
+ sql """update ${mysqlDb}.${table1} set gbk_col='更新的中文' where
id=2"""
+ sql """update ${mysqlDb}.${table1} set utf8mb4_col=CONCAT('Updated
', '🎊') where id=4"""
+ sql """update ${mysqlDb}.${table1} set
special_chars=CONCAT('upd_', CHAR(10), CHAR(9), 'after') where id=5"""
+ }
+
+ // Wait until all 6 binlog inserts and 3 updates are visible.
+ try {
+ Awaitility.await().atMost(180, SECONDS)
+ .pollInterval(2, SECONDS).until(
+ {
+ def cnt = sql """select count(1) from
${currentDb}.${table1}"""
+ def upd2 = sql """select gbk_col from
${currentDb}.${table1} where id=2"""
+ def upd4 = sql """select utf8mb4_col from
${currentDb}.${table1} where id=4"""
+ def upd5 = sql """select special_chars from
${currentDb}.${table1} where id=5"""
+ def g2 = upd2.get(0).get(0) == null ? '' :
upd2.get(0).get(0).toString()
+ def u4 = upd4.get(0).get(0) == null ? '' :
upd4.get(0).get(0).toString()
+ def s5 = upd5.get(0).get(0) == null ? '' :
upd5.get(0).get(0).toString()
+ log.info("incr count=" + cnt + " id2.gbk=" + g2 + "
id4.utf=" + u4 + " id5.special=" + s5)
+ cnt.get(0).get(0) == 12 &&
+ g2.contains('更新') &&
+ u4.startsWith('Updated') &&
+ s5.startsWith('upd_')
+ }
+ )
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job (incr): " + showjob)
+ log.info("show task (incr): " + showtask)
+ throw ex
+ }
+
+ qt_select_after_incr """select id, tag, gbk_col, latin1_col,
utf8mb4_col, special_chars from ${currentDb}.${table1} order by id;"""
+ qt_select_long_text_len_after """select id, length(long_text) from
${currentDb}.${table1} where id in (6, 106) order by id;"""
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+ def jobCountRsp = sql """select count(1) from jobs("type"="insert")
where Name ='${jobName}'"""
+ assert jobCountRsp.get(0).get(0) == 0
+ }
+}
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_enum_set.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_enum_set.groovy
new file mode 100644
index 00000000000..cc7f9ae67f6
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_enum_set.groovy
@@ -0,0 +1,182 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+// Guard MySQL ENUM and SET column fidelity across cdc snapshot + binlog paths.
+//
+// Why this matters:
+// MySQL stores ENUM as a 1-based index and SET as a bitmap in binlog.
+// The connector MUST resolve those back to label strings against the table
+// schema, otherwise downstream sees "1"/"2"/"3" instead of value names.
+// all_type already inserts ENUM='Value1' and SET='Option1' single values
+// but does NOT cover: max-index ENUM, multi-element SET, full-bitmap SET,
+// empty SET, dedup/reorder of SET input, NULL, or UPDATE paths.
+//
+// Coverage:
+// - ENUM at first / middle / last index, plus NULL.
+// - SET single element / multi-element / full bitmap / empty string '' /
+// repeated input ('audit,read,admin,read' should be normalized).
+// - snapshot ids 1..5 then binlog ids 101..106 repeat the same themes
+// plus a binlog-only "dedup_reorder" probe.
+// - UPDATE on ENUM (cross multiple indices) and SET (shrink / grow).
+suite("test_streaming_mysql_job_enum_set",
"p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
+ def jobName = "test_streaming_mysql_job_enum_set_name"
+ def currentDb = (sql "select database()")[0][0]
+ def table1 = "streaming_enum_set_pk"
+ def mysqlDb = "test_cdc_db"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${table1} force"""
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String mysql_port = context.config.otherConfigs.get("mysql_57_port");
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar"
+
+ // ===== Prepare MySQL side =====
+ connect("root", "123456",
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+ sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}"""
+ sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}"""
+ sql """
+ create table ${mysqlDb}.${table1} (
+ `id` int primary key,
+ `tag` varchar(64),
+ `status` enum('pending', 'active', 'closed', 'deleted'),
+ `perms` set('read', 'write', 'admin', 'audit')
+ ) engine=innodb default charset=utf8;
+ """
+
+ // ----- Snapshot rows: 5 ENUM/SET themes via JDBC path -----
+ // first_value: ENUM index 1, SET single element bit 0
+ sql """insert into ${mysqlDb}.${table1} values (1, 'first_value',
'pending', 'read')"""
+ // middle_value: ENUM mid index, SET two-element bitmap
+ sql """insert into ${mysqlDb}.${table1} values (2, 'middle_value',
'active', 'read,write')"""
+ // last_value: ENUM last index, SET full bitmap (all four bits
set)
+ sql """insert into ${mysqlDb}.${table1} values (3, 'last_value',
'deleted', 'read,write,admin,audit')"""
+ // empty_set: ENUM NULL, SET empty string '' (empty bitmap, NOT
NULL)
+ sql """insert into ${mysqlDb}.${table1} (id, tag, perms) values
(4, 'empty_set', '')"""
+ // sql_null: ENUM NULL, SET NULL
+ sql """insert into ${mysqlDb}.${table1} (id, tag) values (5,
'sql_null')"""
+ }
+
+ sql """CREATE JOB ${jobName}
+ ON STREAMING
+ FROM MYSQL (
+ "jdbc_url" =
"jdbc:mysql://${externalEnvIp}:${mysql_port}?serverTimezone=UTC",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "com.mysql.cj.jdbc.Driver",
+ "user" = "root",
+ "password" = "123456",
+ "database" = "${mysqlDb}",
+ "include_tables" = "${table1}",
+ "offset" = "initial"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )
+ """
+
+ // Wait for snapshot to land all 5 rows in Doris.
+ try {
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(2, SECONDS).until(
+ {
+ def cnt = sql """select count(1) from
${currentDb}.${table1}"""
+ log.info("snapshot row count: " + cnt)
+ cnt.get(0).get(0) == 5
+ }
+ )
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex
+ }
+
+ // Key contract: status returns 'pending'/'active'/etc., NOT
'1'/'2'/'3'.
+ qt_desc_enum_set """desc ${currentDb}.${table1};"""
+ qt_select_snapshot """select id, tag, status, perms from
${currentDb}.${table1} order by id;"""
+
+ // ===== Binlog phase: repeat the SAME 5 themes through binlog path
=====
+ connect("root", "123456",
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+ sql """insert into ${mysqlDb}.${table1} values (101,
'first_value', 'pending', 'read')"""
+ sql """insert into ${mysqlDb}.${table1} values (102,
'middle_value', 'active', 'read,write')"""
+ sql """insert into ${mysqlDb}.${table1} values (103, 'last_value',
'deleted', 'read,write,admin,audit')"""
+ sql """insert into ${mysqlDb}.${table1} (id, tag, perms) values
(104, 'empty_set', '')"""
+ sql """insert into ${mysqlDb}.${table1} (id, tag) values (105,
'sql_null')"""
+
+ // SET dedup + reorder probe (binlog-only). MySQL normalizes the
+ // input 'audit,read,admin,read' to declaration order with dedup:
+ // declaration order is ('read', 'write', 'admin', 'audit')
+ // bits set: read=1, admin=4, audit=8 -> bitmap = 13
+ // canonical text: 'read,admin,audit'
+ // The binlog event carries the bitmap, so cdc must resolve it
+ // back to the canonical comma-separated label string.
+ sql """insert into ${mysqlDb}.${table1} values (106,
'dedup_reorder', 'closed', 'audit,read,admin,read')"""
+
+ // UPDATEs: validate UPDATE binlog parsing on ENUM/SET fields.
+ // id=1 (first) -> status crosses multiple indices (1 -> 4)
+ // id=3 (last) -> perms shrinks from full bitmap -> single
element
+ // id=4 (empty) -> perms grows from '' -> 'read,audit'
+ sql """update ${mysqlDb}.${table1} set status='deleted' where
id=1"""
+ sql """update ${mysqlDb}.${table1} set perms='admin' where id=3"""
+ sql """update ${mysqlDb}.${table1} set perms='read,audit' where
id=4"""
+ }
+
+ // Wait until all 6 binlog inserts and 3 updates are visible.
+ try {
+ Awaitility.await().atMost(180, SECONDS)
+ .pollInterval(2, SECONDS).until(
+ {
+ def cnt = sql """select count(1) from
${currentDb}.${table1}"""
+ def upd1 = sql """select status from
${currentDb}.${table1} where id=1"""
+ def upd3 = sql """select perms from
${currentDb}.${table1} where id=3"""
+ def upd4 = sql """select perms from
${currentDb}.${table1} where id=4"""
+ def s1 = upd1.get(0).get(0) == null ? '' :
upd1.get(0).get(0).toString()
+ def p3 = upd3.get(0).get(0) == null ? '' :
upd3.get(0).get(0).toString()
+ def p4 = upd4.get(0).get(0) == null ? '' :
upd4.get(0).get(0).toString()
+ log.info("incr count=" + cnt + " id1.status=" + s1 + "
id3.perms=" + p3 + " id4.perms=" + p4)
+ cnt.get(0).get(0) == 11 &&
+ s1 == 'deleted' &&
+ p3 == 'admin' &&
+ p4.contains('read') && p4.contains('audit')
+ }
+ )
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job (incr): " + showjob)
+ log.info("show task (incr): " + showtask)
+ throw ex
+ }
+
+ qt_select_after_incr """select id, tag, status, perms from
${currentDb}.${table1} order by id;"""
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+ def jobCountRsp = sql """select count(1) from jobs("type"="insert")
where Name ='${jobName}'"""
+ assert jobCountRsp.get(0).get(0) == 0
+ }
+}
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_integer_boundary.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_integer_boundary.groovy
new file mode 100644
index 00000000000..be00ffa7389
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_integer_boundary.groovy
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+// Guard MySQL integer-type boundaries that all_type does NOT cover:
+// 1) UNSIGNED boundaries (esp. BIGINT UNSIGNED >= 2^63, which overflows
Java long)
+// 2) TINYINT(1) <-> BOOLEAN mapping (MySQL ambiguity not present in PG)
+//
+// Coverage is run twice: ids 1-5 cover the snapshot path, ids 101-105 repeat
+// the same boundary themes through the binlog path. Plus UPDATEs that switch
+// boundary values to validate UPDATE binlog parsing on UNSIGNED extremes.
+//
+// PG does not need a symmetric test: PG has no UNSIGNED and uses native
+// boolean, so neither risk applies.
+suite("test_streaming_mysql_job_integer_boundary",
"p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
+ def jobName = "test_streaming_mysql_job_integer_boundary_name"
+ def currentDb = (sql "select database()")[0][0]
+ def table1 = "streaming_integer_boundary_pk"
+ def mysqlDb = "test_cdc_db"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${table1} force"""
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String mysql_port = context.config.otherConfigs.get("mysql_57_port");
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar"
+
+ // ===== Prepare MySQL side =====
+ connect("root", "123456",
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+ sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}"""
+ sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}"""
+ sql """
+ create table ${mysqlDb}.${table1} (
+ `id` int primary key,
+ `tag` varchar(64),
+ `tinyint_u` tinyint unsigned,
+ `smallint_u` smallint unsigned,
+ `mediumint_u` mediumint unsigned,
+ `int_u` int unsigned,
+ `bigint_u` bigint unsigned,
+ `bool_col` tinyint(1),
+ `tinyint_s` tinyint
+ ) engine=innodb default charset=utf8;
+ """
+
+ // ----- Snapshot rows: 5 boundary themes via JDBC path -----
+ // tinyint_u smallint_u
mediumint_u int_u bigint_u bool tinyint_s
+ sql """insert into ${mysqlDb}.${table1} values (1, 'all_zero',
0, 0, 0, 0, 0,
0, 0)"""
+ sql """insert into ${mysqlDb}.${table1} values (2, 'signed_max',
127, 32767, 8388607, 2147483647, 9223372036854775807,
1, 127)"""
+ // signed_max+1: every column passes its signed boundary; bigint_u
steps into Java long overflow range.
+ sql """insert into ${mysqlDb}.${table1} values (3,
'signed_max_plus1', 128, 32768, 8388608, 2147483648,
9223372036854775808, 0, -128)"""
+ // unsigned_max: each column at its unsigned upper bound. bigint_u
is 2^64-1.
+ sql """insert into ${mysqlDb}.${table1} values (4, 'unsigned_max',
255, 65535, 16777215, 4294967295, 18446744073709551615,
1, -1)"""
+ sql """insert into ${mysqlDb}.${table1} values (5, 'mid_value',
100, 30000, 8000000, 2000000000, 9000000000000000000,
0, 50)"""
+ }
+
+ sql """CREATE JOB ${jobName}
+ ON STREAMING
+ FROM MYSQL (
+ "jdbc_url" =
"jdbc:mysql://${externalEnvIp}:${mysql_port}?serverTimezone=UTC",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "com.mysql.cj.jdbc.Driver",
+ "user" = "root",
+ "password" = "123456",
+ "database" = "${mysqlDb}",
+ "include_tables" = "${table1}",
+ "offset" = "initial"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )
+ """
+
+ // Wait for snapshot to land all 5 rows in Doris.
+ try {
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(2, SECONDS).until(
+ {
+ def cnt = sql """select count(1) from
${currentDb}.${table1}"""
+ log.info("snapshot row count: " + cnt)
+ cnt.get(0).get(0) == 5
+ }
+ )
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex
+ }
+
+ // Verify type mapping in Doris (tinyint_u->smallint,
bigint_u->largeint, etc.)
+ qt_desc_integer_boundary """desc ${currentDb}.${table1};"""
+ qt_select_snapshot """select id, tag, tinyint_u, smallint_u,
mediumint_u, int_u, bigint_u, bool_col, tinyint_s from ${currentDb}.${table1}
order by id;"""
+
+ // ===== Binlog phase: repeat the SAME 5 boundary themes through
binlog path =====
+ connect("root", "123456",
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+ sql """insert into ${mysqlDb}.${table1} values (101, 'all_zero',
0, 0, 0, 0, 0,
0, 0)"""
+ sql """insert into ${mysqlDb}.${table1} values (102, 'signed_max',
127, 32767, 8388607, 2147483647, 9223372036854775807,
1, 127)"""
+ sql """insert into ${mysqlDb}.${table1} values (103,
'signed_max_plus1', 128, 32768, 8388608, 2147483648,
9223372036854775808, 0, -128)"""
+ sql """insert into ${mysqlDb}.${table1} values (104,
'unsigned_max', 255, 65535, 16777215, 4294967295,
18446744073709551615, 1, -1)"""
+ sql """insert into ${mysqlDb}.${table1} values (105, 'mid_value',
100, 30000, 8000000, 2000000000, 9000000000000000000,
0, 50)"""
+
+ sql """update ${mysqlDb}.${table1} set
bigint_u=18446744073709551615 where id=1"""
+ sql """update ${mysqlDb}.${table1} set bool_col=0 where id=2"""
+ sql """update ${mysqlDb}.${table1} set int_u=4294967295 where
id=5"""
+ }
+
+ // Wait until all 5 binlog inserts and 3 updates are visible.
+ try {
+ Awaitility.await().atMost(180, SECONDS)
+ .pollInterval(2, SECONDS).until(
+ {
+ def cnt = sql """select count(1) from
${currentDb}.${table1}"""
+ def upd1 = sql """select cast(bigint_u as string) from
${currentDb}.${table1} where id=1"""
+ def upd2 = sql """select bool_col from
${currentDb}.${table1} where id=2"""
+ def upd5 = sql """select int_u from
${currentDb}.${table1} where id=5"""
+ def b1 = upd1.get(0).get(0) == null ? '' :
upd1.get(0).get(0).toString()
+ def b2 = upd2.get(0).get(0)
+ def b5 = upd5.get(0).get(0)
+ log.info("incr count=" + cnt + " id1.bigint_u=" + b1 +
" id2.bool_col=" + b2 + " id5.int_u=" + b5)
+ cnt.get(0).get(0) == 10 &&
+ b1 == '18446744073709551615' &&
+ b2 != null && b2.toString() == 'false' &&
+ b5 != null && b5.toString() == '4294967295'
+ }
+ )
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job (incr): " + showjob)
+ log.info("show task (incr): " + showtask)
+ throw ex
+ }
+
+ qt_select_after_incr """select id, tag, tinyint_u, smallint_u,
mediumint_u, int_u, bigint_u, bool_col, tinyint_s from ${currentDb}.${table1}
order by id;"""
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+ def jobCountRsp = sql """select count(1) from jobs("type"="insert")
where Name ='${jobName}'"""
+ assert jobCountRsp.get(0).get(0) == 0
+ }
+}
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_json_types.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_json_types.groovy
new file mode 100644
index 00000000000..08fc209b6d1
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_json_types.groovy
@@ -0,0 +1,176 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+// Guard MySQL JSON column fidelity across both cdc paths:
+// - snapshot (JDBC + information_schema)
+// - binlog event (cdc-client + debezium)
+// MySQL JSON maps to Doris `text`, so the contract is roundtrip of common
+// JSON shapes that customers hit. Both paths must produce identical output.
+//
+// Coverage is run TWICE: ids 1-14 cover the snapshot path, ids 101-114
+// repeat the same 14 shapes through the binlog path. Plus UPDATEs that
+// switch one shape to another to validate UPDATE binlog parsing.
+suite("test_streaming_mysql_job_json_types",
"p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
+ def jobName = "test_streaming_mysql_job_json_types_name"
+ def currentDb = (sql "select database()")[0][0]
+ def table1 = "streaming_json_types_pk"
+ def mysqlDb = "test_cdc_db"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${table1} force"""
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String mysql_port = context.config.otherConfigs.get("mysql_57_port");
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar"
+
+ // ===== Prepare MySQL side =====
+ connect("root", "123456",
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+ sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}"""
+ sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}"""
+ sql """
+ create table ${mysqlDb}.${table1} (
+ `id` int primary key,
+ `tag` varchar(64),
+ `j` json
+ ) engine=innodb default charset=utf8mb4;
+ """
+
+ // ----- Snapshot rows: 14 JSON shapes via JDBC path -----
+ sql """insert into ${mysqlDb}.${table1} values (1, 'simple_kv',
'{"id":1,"name":"a"}')"""
+ sql """insert into ${mysqlDb}.${table1} values (2, 'nested_obj',
'{"a":{"b":{"c":1,"d":[1,2]}}}')"""
+ sql """insert into ${mysqlDb}.${table1} values (3, 'array',
'[1,2,3,4,5]')"""
+ sql """insert into ${mysqlDb}.${table1} values (4,
'nested_array', '[[1,2],[3,4],[5,6]]')"""
+ sql """insert into ${mysqlDb}.${table1} values (5, 'mixed',
'{"users":[{"id":1,"tags":["a","b"]},{"id":2,"tags":[]}]}')"""
+ sql """insert into ${mysqlDb}.${table1} values (6, 'empty_obj',
'{}')"""
+ sql """insert into ${mysqlDb}.${table1} values (7, 'empty_arr',
'[]')"""
+ sql """insert into ${mysqlDb}.${table1} values (8,
'unicode_chinese', '{"name":"张三","city":"上海"}')"""
+ sql """insert into ${mysqlDb}.${table1} values (9, 'emoji',
'{"msg":"Hello 🚀 World 😀"}')"""
+ sql """insert into ${mysqlDb}.${table1} values (10,
'newline_in_value', JSON_OBJECT('text', CONVERT(CONCAT('line1', CHAR(10),
'line2') USING utf8mb4)))"""
+ sql """insert into ${mysqlDb}.${table1} values (11,
'quote_in_value', JSON_OBJECT('text', 'she said "hi"'))"""
+ sql """insert into ${mysqlDb}.${table1} values (12,
'scalar_array', '[true,false,null,1,1.5,"str"]')"""
+ sql """insert into ${mysqlDb}.${table1} values (13, 'sql_null',
NULL)"""
+ sql """insert into ${mysqlDb}.${table1} values (14, 'json_null',
'null')"""
+ }
+
+ sql """CREATE JOB ${jobName}
+ ON STREAMING
+ FROM MYSQL (
+ "jdbc_url" =
"jdbc:mysql://${externalEnvIp}:${mysql_port}?serverTimezone=UTC",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "com.mysql.cj.jdbc.Driver",
+ "user" = "root",
+ "password" = "123456",
+ "database" = "${mysqlDb}",
+ "include_tables" = "${table1}",
+ "offset" = "initial"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )
+ """
+
+ // Wait for snapshot to land all 14 rows in Doris.
+ try {
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(2, SECONDS).until(
+ {
+ def cnt = sql """select count(1) from
${currentDb}.${table1}"""
+ log.info("snapshot row count: " + cnt)
+ cnt.get(0).get(0) == 14
+ }
+ )
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex
+ }
+
+ qt_desc_json_types """desc ${currentDb}.${table1};"""
+ qt_select_snapshot """select id, tag, j from ${currentDb}.${table1}
order by id;"""
+
+ // ===== Binlog phase: repeat the SAME 14 shapes through binlog path
=====
+ connect("root", "123456",
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+ sql """insert into ${mysqlDb}.${table1} values (101, 'simple_kv',
'{"id":101,"name":"b"}')"""
+ sql """insert into ${mysqlDb}.${table1} values (102, 'nested_obj',
'{"a":{"b":{"c":2,"d":[3,4]}}}')"""
+ sql """insert into ${mysqlDb}.${table1} values (103, 'array',
'[10,20,30]')"""
+ sql """insert into ${mysqlDb}.${table1} values (104,
'nested_array', '[[7,8],[9,10]]')"""
+ sql """insert into ${mysqlDb}.${table1} values (105, 'mixed',
'{"users":[{"id":3,"tags":["c"]},{"id":4,"tags":["d","e"]}]}')"""
+ sql """insert into ${mysqlDb}.${table1} values (106, 'empty_obj',
'{}')"""
+ sql """insert into ${mysqlDb}.${table1} values (107, 'empty_arr',
'[]')"""
+ sql """insert into ${mysqlDb}.${table1} values (108,
'unicode_chinese', '{"name":"李四","city":"北京"}')"""
+ sql """insert into ${mysqlDb}.${table1} values (109, 'emoji',
'{"msg":"Bye 👋 👀"}')"""
+ sql """insert into ${mysqlDb}.${table1} values (110,
'newline_in_value', JSON_OBJECT('text', CONVERT(CONCAT('abc', CHAR(10), 'def')
USING utf8mb4)))"""
+ sql """insert into ${mysqlDb}.${table1} values (111,
'quote_in_value', JSON_OBJECT('text', 'he said "ok"'))"""
+ sql """insert into ${mysqlDb}.${table1} values (112,
'scalar_array', '[false,true,null,42,3.14,"x"]')"""
+ sql """insert into ${mysqlDb}.${table1} values (113, 'sql_null',
NULL)"""
+ sql """insert into ${mysqlDb}.${table1} values (114, 'json_null',
'null')"""
+
+ // UPDATEs: switch JSON shape via binlog path
+ // id=1: simple_kv -> nested object (verifies UPDATE-on-JSON
shape change)
+ // id=8: chinese -> emoji (verifies UPDATE on 4-byte
UTF-8)
+ // id=13: SQL NULL -> non-null JSON (verifies NULL -> value
transition)
+ sql """update ${mysqlDb}.${table1} set
j='{"u":{"v":{"w":[1,2,3]}}}' where id=1"""
+ sql """update ${mysqlDb}.${table1} set j='{"msg":"after-update
🎉"}' where id=8"""
+ sql """update ${mysqlDb}.${table1} set j='{"recovered":true}'
where id=13"""
+ }
+
+ // Wait until all 14 inserts and 3 updates are visible.
+ try {
+ Awaitility.await().atMost(180, SECONDS)
+ .pollInterval(2, SECONDS).until(
+ {
+ def cnt = sql """select count(1) from
${currentDb}.${table1}"""
+ def upd1 = sql """select j from ${currentDb}.${table1}
where id=1"""
+ def upd8 = sql """select j from ${currentDb}.${table1}
where id=8"""
+ def upd13 = sql """select j from
${currentDb}.${table1} where id=13"""
+ def j1 = upd1.get(0).get(0) == null ? '' :
upd1.get(0).get(0).toString()
+ def j8 = upd8.get(0).get(0) == null ? '' :
upd8.get(0).get(0).toString()
+ def j13 = upd13.get(0).get(0) == null ? '' :
upd13.get(0).get(0).toString()
+ log.info("incr count=" + cnt + " j1=" + j1 + " j8=" +
j8 + " j13=" + j13)
+ cnt.get(0).get(0) == 28 &&
+ j1.contains('"u"') &&
+ j8.contains('after-update') &&
+ j13.contains('recovered')
+ }
+ )
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job (incr): " + showjob)
+ log.info("show task (incr): " + showtask)
+ throw ex
+ }
+
+ qt_select_after_incr """select id, tag, j from ${currentDb}.${table1}
order by id;"""
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+ def jobCountRsp = sql """select count(1) from jobs("type"="insert")
where Name ='${jobName}'"""
+ assert jobCountRsp.get(0).get(0) == 0
+ }
+}
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_partition.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_partition.groovy
new file mode 100644
index 00000000000..9e5d33d7c93
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_partition.groovy
@@ -0,0 +1,166 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+// MySQL RANGE-partitioned table cdc, symmetric to
test_streaming_postgres_job_partition.
+// binlog emits row events at the logical-table level, so INSERT/UPDATE/DELETE
+// across partitions and ADD PARTITION + INSERT on the new partition all flow
+// through normally. DROP PARTITION is intentionally not exercised here: it
+// emits no row events, so the dropped rows would NOT be removed in Doris.
+suite("test_streaming_mysql_job_partition",
"p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
+ def jobName = "test_streaming_mysql_job_partition_name"
+ def currentDb = (sql "select database()")[0][0]
+ def table1 = "user_info_mysql_orders"
+ def mysqlDb = "test_cdc_db"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${table1} force"""
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String mysql_port = context.config.otherConfigs.get("mysql_57_port");
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar"
+
+ // 1. create MySQL partitioned table and insert snapshot data
+ connect("root", "123456",
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+ sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}"""
+ sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}"""
+
+ // MySQL requires the partition expression columns to be part of
every
+ // unique/primary key, so (id, order_date) is used as PK.
+ sql """
+ CREATE TABLE ${mysqlDb}.${table1} (
+ id BIGINT,
+ user_id BIGINT,
+ order_date DATE,
+ PRIMARY KEY (id, order_date)
+ ) ENGINE=InnoDB DEFAULT CHARSET=utf8
+ PARTITION BY RANGE COLUMNS(order_date) (
+ PARTITION p202401 VALUES LESS THAN ('2024-02-01'),
+ PARTITION p202402 VALUES LESS THAN ('2024-03-01')
+ )
+ """
+
+ // snapshot rows, one per partition
+ sql """INSERT INTO ${mysqlDb}.${table1} (id, user_id, order_date)
VALUES (1, 1001, '2024-01-10')"""
+ sql """INSERT INTO ${mysqlDb}.${table1} (id, user_id, order_date)
VALUES (2, 1002, '2024-02-05')"""
+ }
+
+ // 2. create streaming job
+ sql """CREATE JOB ${jobName}
+ ON STREAMING
+ FROM MYSQL (
+ "jdbc_url" =
"jdbc:mysql://${externalEnvIp}:${mysql_port}?serverTimezone=UTC",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "com.mysql.cj.jdbc.Driver",
+ "user" = "root",
+ "password" = "123456",
+ "database" = "${mysqlDb}",
+ "include_tables" = "${table1}",
+ "offset" = "initial"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )
+ """
+
+ // wait snapshot to land
+ try {
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(2, SECONDS).until(
+ {
+ def cnt = sql """select count(1) from
${currentDb}.${table1}"""
+ log.info("snapshot row count: " + cnt)
+ cnt.get(0).get(0) == 2
+ }
+ )
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex
+ }
+
+ // 3. check snapshot data
+ qt_select_orders_partition_snapshot """
+ SELECT id, user_id, order_date FROM ${table1} ORDER BY id
+ """
+
+ // 4. binlog phase: DML across partitions + ADD PARTITION + insert new
partition
+ connect("root", "123456",
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+ // insert into existing partition p202401
+ sql """INSERT INTO ${mysqlDb}.${table1} (id, user_id, order_date)
VALUES (3, 1003, '2024-01-20')"""
+
+ // update in p202402
+ sql """UPDATE ${mysqlDb}.${table1} SET user_id = 2002 WHERE id = 2
AND order_date = '2024-02-05'"""
+
+ // delete from p202401
+ sql """DELETE FROM ${mysqlDb}.${table1} WHERE id = 1 AND
order_date = '2024-01-10'"""
+
+ // add a new partition then insert into it
+ sql """ALTER TABLE ${mysqlDb}.${table1} ADD PARTITION (PARTITION
p202403 VALUES LESS THAN ('2024-04-01'))"""
+ sql """INSERT INTO ${mysqlDb}.${table1} (id, user_id, order_date)
VALUES (4, 1004, '2024-03-15')"""
+ }
+
+ // wait for binlog to deliver: +2 inserts -1 delete -> 3 rows, id=2
user_id should be 2002
+ try {
+ Awaitility.await().atMost(180, SECONDS)
+ .pollInterval(2, SECONDS).until(
+ {
+ def cnt = sql """select count(1) from
${currentDb}.${table1}"""
+ def updated = sql """select user_id from
${currentDb}.${table1} where id=2"""
+ def deleted = sql """select count(1) from
${currentDb}.${table1} where id=1"""
+ def newPart = sql """select count(1) from
${currentDb}.${table1} where id=4"""
+ def u2 = updated.size() == 0 ? null :
updated.get(0).get(0)
+ log.info("incr count=" + cnt + " id2.user_id=" + u2 +
" id1.exists=" + deleted + " id4.exists=" + newPart)
+ cnt.get(0).get(0) == 3 &&
+ u2 != null && u2.toString() == '2002' &&
+ deleted.get(0).get(0) == 0 &&
+ newPart.get(0).get(0) == 1
+ }
+ )
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job (incr): " + showjob)
+ log.info("show task (incr): " + showtask)
+ throw ex
+ }
+
+ def jobInfo = sql """
+ select loadStatistic, status from jobs("type"="insert") where
Name='${jobName}'
+ """
+ log.info("jobInfo: " + jobInfo)
+ assert jobInfo.get(0).get(1) == "RUNNING"
+
+ qt_select_orders_partition_binlog_all """
+ SELECT id, user_id, order_date FROM ${table1} ORDER BY id
+ """
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ def jobCountRsp = sql """select count(1) from jobs("type"="insert")
where Name ='${jobName}'"""
+ assert jobCountRsp.get(0).get(0) == 0
+ }
+}
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job.groovy
index 15ff0324fa4..9aacdd22c98 100644
---
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job.groovy
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job.groovy
@@ -74,6 +74,7 @@ suite("test_streaming_postgres_job",
"p0,external,pg,external_docker,external_do
// mock snapshot data
sql """INSERT INTO ${pgDB}.${pgSchema}.${table2} (name, age)
VALUES ('A2', 1);"""
sql """INSERT INTO ${pgDB}.${pgSchema}.${table2} (name, age)
VALUES ('B2', 2);"""
+
}
sql """CREATE JOB ${jobName}
@@ -100,7 +101,6 @@ suite("test_streaming_postgres_job",
"p0,external,pg,external_docker,external_do
assert showTables.size() == 1
def showTables2 = sql """ show tables from ${currentDb} like
'${table2}'; """
assert showTables2.size() == 1
-
// check table schema correct
def showTbl1 = sql """show create table ${currentDb}.${table1}"""
def createTalInfo = showTbl1[0][1];
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_array_boundary.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_array_boundary.groovy
new file mode 100644
index 00000000000..f6e5a64a666
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_array_boundary.groovy
@@ -0,0 +1,185 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+// Guard PG array column boundaries across cdc snapshot + binlog paths.
+//
+// PG _all_type already covers a basic single-dimension integer[] / text[]
case.
+// This suite covers the boundaries that _all_type and _array_types do not:
+// - Array elements containing NULL '{1,NULL,3}'
+// - Empty array '{}' VS SQL NULL (two flavours of "empty")
+// - Text array elements with commas / quotes that need PG escape
+// Multi-dimensional arrays are not covered: Debezium (DBZ-315) and Flink CDC
+// upstream do not support multi-dim PG arrays — inner elements come through as
+// NULL. The `multi_dim` column below is kept but only carries 1D data.
+//
+// snapshot ids 1..5 then binlog ids 101..105 repeat the same themes.
+// UPDATE rewrites a few rows to validate UPDATE binlog parsing on arrays.
+suite("test_streaming_postgres_job_array_boundary",
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+ def jobName = "test_streaming_postgres_job_array_boundary_name"
+ def currentDb = (sql "select database()")[0][0]
+ def table1 = "streaming_array_boundary_pk"
+ def pgDB = "postgres"
+ def pgSchema = "cdc_test"
+ def pgUser = "postgres"
+ def pgPassword = "123456"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${table1} force"""
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String pg_port = context.config.otherConfigs.get("pg_14_port");
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar"
+
+ // ===== Prepare PG side =====
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}"""
+ sql """
+ create table ${pgDB}.${pgSchema}.${table1} (
+ id integer PRIMARY KEY,
+ tag varchar(64),
+ int_arr integer[],
+ text_arr text[],
+ multi_dim integer[],
+ empty_or_null integer[]
+ );
+ """
+
+ // ----- Snapshot rows: 5 array boundary themes via JDBC path -----
+ // basic: regular single-dim arrays
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (1,
'basic', ARRAY[1,2,3], ARRAY['a','b','c'], NULL,
ARRAY[7,8,9])"""
+
+ // null_elements: NULL inside arrays must roundtrip distinctly
from missing
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (2,
'null_elements', ARRAY[1,NULL,3], ARRAY['a',NULL,'c'], NULL,
NULL)"""
+
+ // multi_dim: Debezium does not support multi-dim PG arrays
(DBZ-315);
+ // stay 1D to keep cdc behaviour deterministic.
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (3,
'multi_dim', NULL, NULL,
ARRAY[1,2,3,4,5,6], NULL)"""
+
+ // empty_array: '{}' is the empty array, distinct from SQL NULL
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (4,
'empty_array', '{}'::integer[], '{}'::text[], NULL,
'{}'::integer[])"""
+
+ // sql_null_array: all arrays are SQL NULL
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (id, tag) VALUES
(5, 'sql_null_array')"""
+ }
+
+ sql """CREATE JOB ${jobName}
+ ON STREAMING
+ FROM POSTGRES (
+ "jdbc_url" =
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}?timezone=UTC",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "org.postgresql.Driver",
+ "user" = "${pgUser}",
+ "password" = "${pgPassword}",
+ "database" = "${pgDB}",
+ "schema" = "${pgSchema}",
+ "include_tables" = "${table1}",
+ "offset" = "initial"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )
+ """
+
+ // Wait for snapshot to land all 5 rows in Doris.
+ try {
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(2, SECONDS).until(
+ {
+ def cnt = sql """select count(1) from
${currentDb}.${table1}"""
+ log.info("snapshot row count: " + cnt)
+ cnt.get(0).get(0) == 5
+ }
+ )
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex
+ }
+
+ qt_desc_array_boundary """desc ${currentDb}.${table1};"""
+ qt_select_snapshot """select id, tag, int_arr, text_arr, multi_dim,
empty_or_null from ${currentDb}.${table1} order by id;"""
+
+ // ===== Binlog phase: repeat same themes via wal2json/pgoutput -----
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (101,
'basic', ARRAY[10,20,30], ARRAY['x','y','z'], NULL,
ARRAY[40,50])"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (102,
'null_elements', ARRAY[NULL,2,NULL], ARRAY[NULL,'mid',NULL], NULL,
NULL)"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (103,
'multi_dim', NULL, NULL,
ARRAY[10,20,30,40], NULL)"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (104,
'empty_array', '{}'::integer[], '{}'::text[], NULL,
'{}'::integer[])"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (id, tag) VALUES
(105, 'sql_null_array')"""
+
+ // Extra binlog-only probe: text elements containing commas — PG
wire format
+ // escapes commas inside elements and cdc must unescape them.
Embedded double
+ // quotes are intentionally avoided: Doris SELECT for array<text>
does not
+ // escape inner '"' in its display format, which makes the .out
output
+ // ambiguous to verify.
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (106,
'text_special', NULL, ARRAY['with,comma', 'a, b, c',
'plain'], NULL, NULL)"""
+
+ // UPDATEs: validate UPDATE binlog parsing on array columns.
+ // id=1 (basic) -> grow int_arr
+ // id=4 (empty) -> empty array '{}' becomes non-empty
+ // id=5 (sql_null) -> NULL becomes non-null array
+ sql """UPDATE ${pgDB}.${pgSchema}.${table1} SET
int_arr=ARRAY[100,200,300,400] WHERE id=1"""
+ sql """UPDATE ${pgDB}.${pgSchema}.${table1} SET
empty_or_null=ARRAY[1,2,3] WHERE id=4"""
+ sql """UPDATE ${pgDB}.${pgSchema}.${table1} SET
int_arr=ARRAY[9,9,9] WHERE id=5"""
+ }
+
+ // Wait until all 6 binlog inserts and 3 updates are visible.
+ try {
+ Awaitility.await().atMost(180, SECONDS)
+ .pollInterval(2, SECONDS).until(
+ {
+ def cnt = sql """select count(1) from
${currentDb}.${table1}"""
+ def upd1 = sql """select int_arr from
${currentDb}.${table1} where id=1"""
+ def upd4 = sql """select empty_or_null from
${currentDb}.${table1} where id=4"""
+ def upd5 = sql """select int_arr from
${currentDb}.${table1} where id=5"""
+ def a1 = upd1.get(0).get(0) == null ? '' :
upd1.get(0).get(0).toString()
+ def a4 = upd4.get(0).get(0) == null ? '' :
upd4.get(0).get(0).toString()
+ def a5 = upd5.get(0).get(0) == null ? '' :
upd5.get(0).get(0).toString()
+ log.info("incr count=" + cnt + " id1.int_arr=" + a1 +
" id4.eon=" + a4 + " id5.int_arr=" + a5)
+ cnt.get(0).get(0) == 11 &&
+ a1.contains('400') &&
+ a4.contains('1') && a4.contains('2') &&
a4.contains('3') &&
+ a5.contains('9')
+ }
+ )
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job (incr): " + showjob)
+ log.info("show task (incr): " + showtask)
+ throw ex
+ }
+
+ qt_select_after_incr """select id, tag, int_arr, text_arr, multi_dim,
empty_or_null from ${currentDb}.${table1} order by id;"""
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+ def jobCountRsp = sql """select count(1) from jobs("type"="insert")
where Name ='${jobName}'"""
+ assert jobCountRsp.get(0).get(0) == 0
+ }
+}
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_jsonb_types.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_jsonb_types.groovy
new file mode 100644
index 00000000000..65bb02a47d0
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_jsonb_types.groovy
@@ -0,0 +1,186 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+// Guard PG json vs jsonb fidelity across cdc snapshot + binlog paths.
+//
+// json : textual storage, preserves whitespace and duplicate keys as-is.
+// jsonb : binary storage, drops insignificant whitespace and dedups keys
+// (last value wins). Most production PG schemas use jsonb.
+//
+// _all_type already covers simple objects in both columns. This suite covers
+// the shapes that actually matter for cdc fidelity:
+// - Nested objects, arrays, and mixed structures
+// - Unicode (Chinese), JSON null vs SQL NULL
+// - Whitespace difference between json and jsonb on the SAME input
+// - Duplicate key handling difference between json and jsonb
+//
+// snapshot ids 1..7 then binlog ids 101..107 repeat the same themes.
+// UPDATE rewrites a few rows to validate UPDATE binlog parsing.
+suite("test_streaming_postgres_job_jsonb_types",
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+ def jobName = "test_streaming_postgres_job_jsonb_types_name"
+ def currentDb = (sql "select database()")[0][0]
+ def table1 = "streaming_jsonb_types_pk"
+ def pgDB = "postgres"
+ def pgSchema = "cdc_test"
+ def pgUser = "postgres"
+ def pgPassword = "123456"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${table1} force"""
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String pg_port = context.config.otherConfigs.get("pg_14_port");
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar"
+
+ // ===== Prepare PG side =====
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}"""
+ sql """
+ create table ${pgDB}.${pgSchema}.${table1} (
+ id integer PRIMARY KEY,
+ tag varchar(64),
+ j_json json,
+ j_jsonb jsonb
+ );
+ """
+
+ // ----- Snapshot rows: 7 JSON shape themes via JDBC path -----
+ // simple_kv: baseline single-level object
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (1,
'simple_kv', '{"id":1,"name":"a"}', '{"id":1,"name":"a"}')"""
+
+ // nested: 3-level nested object containing an array
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (2,
'nested', '{"a":{"b":{"c":1,"d":[1,2]}}}',
'{"a":{"b":{"c":1,"d":[1,2]}}}')"""
+
+ // mixed: object containing arrays of objects (real-world shape)
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (3,
'mixed', '{"users":[{"id":1,"tags":["a","b"]},{"id":2,"tags":[]}]}',
'{"users":[{"id":1,"tags":["a","b"]},{"id":2,"tags":[]}]}')"""
+
+ // unicode_chinese: 4-byte UTF-8 / non-ASCII keys+values
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (4,
'unicode', '{"name":"张三","city":"上海"}', '{"name":"张三","city":"上海"}')"""
+
+ // with_spaces: whitespace differs between json (preserved) and
jsonb (canonicalized)
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (5,
'with_spaces','{ "a" : 1 , "b" : 2 }', '{ "a" : 1 , "b" : 2 }')"""
+
+ // dup_keys: json keeps duplicate keys (last value wins on access);
+ // jsonb dedups at parse time and only retains the last value for
the key.
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (6,
'dup_keys', '{"k":1,"k":2}', '{"k":1,"k":2}')"""
+
+ // sql_null: both columns are SQL NULL
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (id, tag) VALUES
(7, 'sql_null')"""
+ }
+
+ sql """CREATE JOB ${jobName}
+ ON STREAMING
+ FROM POSTGRES (
+ "jdbc_url" =
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}?timezone=UTC",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "org.postgresql.Driver",
+ "user" = "${pgUser}",
+ "password" = "${pgPassword}",
+ "database" = "${pgDB}",
+ "schema" = "${pgSchema}",
+ "include_tables" = "${table1}",
+ "offset" = "initial"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )
+ """
+
+ // Wait for snapshot to land all 7 rows in Doris.
+ try {
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(2, SECONDS).until(
+ {
+ def cnt = sql """select count(1) from
${currentDb}.${table1}"""
+ log.info("snapshot row count: " + cnt)
+ cnt.get(0).get(0) == 7
+ }
+ )
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex
+ }
+
+ qt_desc_jsonb_types """desc ${currentDb}.${table1};"""
+ qt_select_snapshot """select id, tag, j_json, j_jsonb from
${currentDb}.${table1} order by id;"""
+
+ // ===== Binlog phase: same shapes through wal2json/pgoutput =====
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (101,
'simple_kv', '{"id":101,"name":"b"}', '{"id":101,"name":"b"}')"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (102,
'nested', '{"a":{"b":{"c":2,"d":[3,4]}}}',
'{"a":{"b":{"c":2,"d":[3,4]}}}')"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (103,
'mixed', '{"users":[{"id":3,"tags":["c"]},{"id":4,"tags":["d","e"]}]}',
'{"users":[{"id":3,"tags":["c"]},{"id":4,"tags":["d","e"]}]}')"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (104,
'unicode', '{"name":"李四","city":"北京"}', '{"name":"李四","city":"北京"}')"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (105,
'with_spaces','{ "x" : 10 , "y" : 20 }', '{ "x" : 10 , "y" : 20
}')"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (106,
'dup_keys', '{"k":10,"k":20,"k":30}', '{"k":10,"k":20,"k":30}')"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (id, tag) VALUES
(107, 'sql_null')"""
+
+ // UPDATEs: validate UPDATE binlog parsing on json/jsonb columns.
+ // id=1 (simple_kv) -> swap to nested shape
+ // id=4 (unicode) -> swap to emoji content
+ // id=7 (sql_null) -> NULL -> non-null jsonb
+ sql """UPDATE ${pgDB}.${pgSchema}.${table1} SET
j_json='{"u":{"v":[1,2,3]}}', j_jsonb='{"u":{"v":[1,2,3]}}' WHERE id=1"""
+ sql """UPDATE ${pgDB}.${pgSchema}.${table1} SET
j_json='{"msg":"updated 🎉"}', j_jsonb='{"msg":"updated 🎉"}' WHERE id=4"""
+ sql """UPDATE ${pgDB}.${pgSchema}.${table1} SET
j_jsonb='{"recovered":true}' WHERE id=7"""
+ }
+
+ // Wait until all 7 binlog inserts and 3 updates are visible.
+ try {
+ Awaitility.await().atMost(180, SECONDS)
+ .pollInterval(2, SECONDS).until(
+ {
+ def cnt = sql """select count(1) from
${currentDb}.${table1}"""
+ def upd1 = sql """select j_jsonb from
${currentDb}.${table1} where id=1"""
+ def upd4 = sql """select j_jsonb from
${currentDb}.${table1} where id=4"""
+ def upd7 = sql """select j_jsonb from
${currentDb}.${table1} where id=7"""
+ def b1 = upd1.get(0).get(0) == null ? '' :
upd1.get(0).get(0).toString()
+ def b4 = upd4.get(0).get(0) == null ? '' :
upd4.get(0).get(0).toString()
+ def b7 = upd7.get(0).get(0) == null ? '' :
upd7.get(0).get(0).toString()
+ log.info("incr count=" + cnt + " id1.jb=" + b1 + "
id4.jb=" + b4 + " id7.jb=" + b7)
+ cnt.get(0).get(0) == 14 &&
+ b1.contains('"u"') &&
+ b4.contains('updated') &&
+ b7.contains('recovered')
+ }
+ )
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job (incr): " + showjob)
+ log.info("show task (incr): " + showtask)
+ throw ex
+ }
+
+ qt_select_after_incr """select id, tag, j_json, j_jsonb from
${currentDb}.${table1} order by id;"""
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+ def jobCountRsp = sql """select count(1) from jobs("type"="insert")
where Name ='${jobName}'"""
+ assert jobCountRsp.get(0).get(0) == 0
+ }
+}
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_uuid.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_uuid.groovy
new file mode 100644
index 00000000000..57dfa9bc24a
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_uuid.groovy
@@ -0,0 +1,176 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+// Guard PG UUID column boundaries across cdc snapshot + binlog paths.
+//
+// Why a dedicated suite:
+// PG stores UUID as 16-byte binary on the wire. wal2json/pgoutput emit it
+// as 36-char hyphenated text. cdc must format it the same way for snapshot
+// (JDBC) and binlog paths. _all_type covers a single normal UUID; this
+// suite adds the boundary values (all-zero, all-ff), array-of-uuid, and
+// NULL handling that customers actually hit (UUID is the de-facto primary
+// key in many overseas PG setups).
+//
+// snapshot ids 1..5 then binlog ids 101..105 repeat the same themes.
+// UPDATE rewrites a few rows to validate UPDATE binlog parsing on UUID.
+suite("test_streaming_postgres_job_uuid",
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+ def jobName = "test_streaming_postgres_job_uuid_name"
+ def currentDb = (sql "select database()")[0][0]
+ def table1 = "streaming_uuid_pk"
+ def pgDB = "postgres"
+ def pgSchema = "cdc_test"
+ def pgUser = "postgres"
+ def pgPassword = "123456"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${table1} force"""
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String pg_port = context.config.otherConfigs.get("pg_14_port");
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar"
+
+ // ===== Prepare PG side =====
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}"""
+ sql """
+ create table ${pgDB}.${pgSchema}.${table1} (
+ id integer PRIMARY KEY,
+ tag varchar(64),
+ uuid_col uuid,
+ uuid_arr uuid[]
+ );
+ """
+
+ // ----- Snapshot rows: 5 UUID boundary themes via JDBC path -----
+ // basic: regular UUID value (the kind _all_type uses)
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (1,
'basic', '11111111-2222-3333-4444-555555555555'::uuid, NULL)"""
+
+ // all_zero: the all-zero UUID (legal but degenerate)
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (2,
'all_zero', '00000000-0000-0000-0000-000000000000'::uuid, NULL)"""
+
+ // all_ff: the all-ones UUID (16 bytes of 0xff, upper bound of
UUID space)
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (3,
'all_ff', 'ffffffff-ffff-ffff-ffff-ffffffffffff'::uuid, NULL)"""
+
+ // uuid_array: UUID array column with mixed values including
all-zero
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (4,
'uuid_array', NULL, ARRAY['11111111-1111-1111-1111-111111111111'::uuid,
'22222222-2222-2222-2222-222222222222'::uuid,
'00000000-0000-0000-0000-000000000000'::uuid])"""
+
+ // sql_null: both columns NULL
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (id, tag) VALUES
(5, 'sql_null')"""
+ }
+
+ sql """CREATE JOB ${jobName}
+ ON STREAMING
+ FROM POSTGRES (
+ "jdbc_url" =
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}?timezone=UTC",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "org.postgresql.Driver",
+ "user" = "${pgUser}",
+ "password" = "${pgPassword}",
+ "database" = "${pgDB}",
+ "schema" = "${pgSchema}",
+ "include_tables" = "${table1}",
+ "offset" = "initial"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )
+ """
+
+ // Wait for snapshot to land all 5 rows in Doris.
+ try {
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(2, SECONDS).until(
+ {
+ def cnt = sql """select count(1) from
${currentDb}.${table1}"""
+ log.info("snapshot row count: " + cnt)
+ cnt.get(0).get(0) == 5
+ }
+ )
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex
+ }
+
+ // Key contract: UUID columns must surface as the 36-char hyphenated
+ // form (lower-case, no surrounding quotes) and arrays preserve order.
+ qt_desc_uuid """desc ${currentDb}.${table1};"""
+ qt_select_snapshot """select id, tag, uuid_col, uuid_arr from
${currentDb}.${table1} order by id;"""
+
+ // ===== Binlog phase: same UUID boundary themes via wal2json/pgoutput
=====
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (101,
'basic', 'aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee'::uuid, NULL)"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (102,
'all_zero', '00000000-0000-0000-0000-000000000000'::uuid, NULL)"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (103,
'all_ff', 'ffffffff-ffff-ffff-ffff-ffffffffffff'::uuid, NULL)"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (104,
'uuid_array', NULL, ARRAY['33333333-3333-3333-3333-333333333333'::uuid,
'44444444-4444-4444-4444-444444444444'::uuid])"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (id, tag) VALUES
(105, 'sql_null')"""
+
+ // UPDATEs: validate UPDATE binlog parsing on UUID fields.
+ // id=1 (basic) -> swap to all-ff
+ // id=2 (all_zero) -> back to a normal UUID
+ // id=5 (sql_null) -> NULL -> non-null UUID
+ sql """UPDATE ${pgDB}.${pgSchema}.${table1} SET
uuid_col='ffffffff-ffff-ffff-ffff-ffffffffffff'::uuid WHERE id=1"""
+ sql """UPDATE ${pgDB}.${pgSchema}.${table1} SET
uuid_col='99999999-9999-9999-9999-999999999999'::uuid WHERE id=2"""
+ sql """UPDATE ${pgDB}.${pgSchema}.${table1} SET
uuid_col='12345678-1234-1234-1234-123456789012'::uuid WHERE id=5"""
+ }
+
+ // Wait until all 5 binlog inserts and 3 updates are visible.
+ try {
+ Awaitility.await().atMost(180, SECONDS)
+ .pollInterval(2, SECONDS).until(
+ {
+ def cnt = sql """select count(1) from
${currentDb}.${table1}"""
+ def upd1 = sql """select uuid_col from
${currentDb}.${table1} where id=1"""
+ def upd2 = sql """select uuid_col from
${currentDb}.${table1} where id=2"""
+ def upd5 = sql """select uuid_col from
${currentDb}.${table1} where id=5"""
+ def u1 = upd1.get(0).get(0) == null ? '' :
upd1.get(0).get(0).toString()
+ def u2 = upd2.get(0).get(0) == null ? '' :
upd2.get(0).get(0).toString()
+ def u5 = upd5.get(0).get(0) == null ? '' :
upd5.get(0).get(0).toString()
+ log.info("incr count=" + cnt + " id1.uuid=" + u1 + "
id2.uuid=" + u2 + " id5.uuid=" + u5)
+ cnt.get(0).get(0) == 10 &&
+ u1.toLowerCase().startsWith('ffffffff') &&
+ u2.toLowerCase().startsWith('99999999') &&
+ u5.toLowerCase().startsWith('12345678')
+ }
+ )
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job (incr): " + showjob)
+ log.info("show task (incr): " + showtask)
+ throw ex
+ }
+
+ qt_select_after_incr """select id, tag, uuid_col, uuid_arr from
${currentDb}.${table1} order by id;"""
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+ def jobCountRsp = sql """select count(1) from jobs("type"="insert")
where Name ='${jobName}'"""
+ assert jobCountRsp.get(0).get(0) == 0
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]