github-actions[bot] commented on code in PR #61317:
URL: https://github.com/apache/doris/pull/61317#discussion_r2963663349


##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java:
##########
@@ -169,6 +169,31 @@ public static Map<String, Set<String>> 
parseAllExcludeColumns(Map<String, String
         return result;
     }
 
+    /**
+     * Parse all target-table name mappings from config.
+     *
+     * <p>Scans all keys matching {@code "table.<srcTableName>.target_table"} 
and returns a map from

Review Comment:
   **Nit: Indentation issue.** The Javadoc `/**` on this line has 6 leading 
spaces instead of 4, making it misaligned with the rest of the file. The 
`parseAllExcludeColumns()` Javadoc above uses 4 spaces. Please fix the 
indentation to be consistent:
   ```java
       /**
        * Parse all target-table name mappings from config.
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java:
##########
@@ -322,6 +332,12 @@ public static List<CreateTableCommand> 
generateCreateTableCmds(String targetDb,
                 noPrimaryKeyTables.add(table);
             }
 
+            // Resolve target (Doris) table name; defaults to source table 
name if not configured
+            String targetTableName = properties.getOrDefault(
+                    DataSourceConfigKeys.TABLE + "." + table + "."
+                            + DataSourceConfigKeys.TABLE_TARGET_TABLE_SUFFIX,
+                    table);

Review Comment:
   **Design concern (multi-table merge): No deduplication or schema 
compatibility check when multiple source tables map to the same target.**
   
   When two source tables (e.g., `pg_src_table` and `pg_src_table2`) both map 
to the same Doris target table, the `LinkedHashMap` will contain two entries 
with different keys but `CreateTableCommand` objects targeting the same Doris 
table. In `createTableIfNotExists()`, the first entry creates the table and the 
second is skipped via `isTableExist()`. This means the **first source table's 
schema silently wins**.
   
   If the two source tables have different schemas (e.g., different columns or 
column types), the data from the second source table may fail or silently drop 
columns.
   
   Suggestion: Either (a) validate that all source tables mapping to the same 
target have compatible schemas, or (b) at minimum log a warning when multiple 
source tables map to the same Doris target table, so users are aware of the 
first-wins behavior. This could be a follow-up improvement.



##########
regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_table_mapping.groovy:
##########
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_postgres_job_table_mapping", 
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+    def jobName         = "test_streaming_pg_table_mapping"
+    def jobNameMerge    = "test_streaming_pg_table_mapping_merge"
+    def currentDb       = (sql "select database()")[0][0]
+    def pgSrcTable      = "pg_src_table"        // upstream PG table name
+    def dorisDstTable   = "doris_dst_table"     // downstream Doris table name 
(mapped)
+    def pgSrcTable2     = "pg_src_table2"       // second upstream table 
(multi-table merge)
+    def dorisMergeTable = "doris_merge_table"   // both PG tables merge into 
this Doris table
+    def pgDB            = "postgres"
+    def pgSchema        = "cdc_test"
+    def pgUser          = "postgres"
+    def pgPassword      = "123456"
+
+    // Cleanup
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    sql """DROP JOB IF EXISTS where jobname = '${jobNameMerge}'"""
+    sql """drop table if exists ${currentDb}.${dorisDstTable} force"""
+    sql """drop table if exists ${currentDb}.${dorisMergeTable} force"""
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String pg_port       = context.config.otherConfigs.get("pg_14_port")
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint   = getS3Endpoint()
+        String bucket        = getS3BucketName()
+        String driver_url    = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar";
+
+        // ── Case 1: basic table name mapping 
─────────────────────────────────
+        // PG table: pg_src_table → Doris table: doris_dst_table
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${pgSrcTable}"""
+            sql """CREATE TABLE ${pgDB}.${pgSchema}.${pgSrcTable} (
+                      "id"   int,
+                      "name" varchar(200),
+                      PRIMARY KEY ("id")
+                    )"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${pgSrcTable} VALUES (1, 
'Alice')"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${pgSrcTable} VALUES (2, 
'Bob')"""
+        }
+
+        sql """CREATE JOB ${jobName}
+                ON STREAMING
+                FROM POSTGRES (
+                    "jdbc_url"       = 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+                    "driver_url"     = "${driver_url}",
+                    "driver_class"   = "org.postgresql.Driver",
+                    "user"           = "${pgUser}",
+                    "password"       = "${pgPassword}",
+                    "database"       = "${pgDB}",
+                    "schema"         = "${pgSchema}",
+                    "include_tables" = "${pgSrcTable}",
+                    "offset"         = "initial",
+                    "table.${pgSrcTable}.target_table" = "${dorisDstTable}"
+                )
+                TO DATABASE ${currentDb} (
+                    "table.create.properties.replication_num" = "1"
+                )"""
+
+        // Verify the Doris table was created with the mapped name, not the 
source name
+        def tables = (sql """show tables from ${currentDb}""").collect { it[0] 
}
+        assert tables.contains(dorisDstTable) : "Doris target table 
'${dorisDstTable}' should exist"
+        assert !tables.contains(pgSrcTable)   : "Source table name 
'${pgSrcTable}' must NOT exist in Doris"
+
+        // Wait for snapshot
+        try {
+            Awaitility.await().atMost(300, SECONDS).pollInterval(1, 
SECONDS).until({
+                def cnt = sql """select SucceedTaskCount from 
jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING'"""
+                cnt.size() == 1 && cnt.get(0).get(0).toLong() >= 2
+            })
+        } catch (Exception ex) {
+            log.info("show job: " + (sql """select * from 
jobs("type"="insert") where Name='${jobName}'"""))
+            log.info("show task: " + (sql """select * from 
tasks("type"="insert") where JobName='${jobName}'"""))
+            throw ex
+        }
+
+        qt_select_snapshot """ SELECT * FROM ${dorisDstTable} ORDER BY id ASC 
"""
+
+        // Incremental: INSERT / UPDATE / DELETE must all land in 
doris_dst_table
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${pgSrcTable} VALUES (3, 
'Carol')"""
+            sql """UPDATE ${pgDB}.${pgSchema}.${pgSrcTable} SET name = 
'Bob_v2' WHERE id = 2"""
+            sql """DELETE FROM ${pgDB}.${pgSchema}.${pgSrcTable} WHERE id = 
1"""
+        }
+        try {
+            Awaitility.await().atMost(120, SECONDS).pollInterval(2, 
SECONDS).until({
+                def ids = (sql """ SELECT id FROM ${dorisDstTable} ORDER BY id 
ASC """).collect { it[0].toInteger() }
+                ids.contains(3) && !ids.contains(1)
+            })
+        } catch (Exception ex) {
+            log.info("show job: " + (sql """select * from 
jobs("type"="insert") where Name='${jobName}'"""))
+            log.info("show task: " + (sql """select * from 
tasks("type"="insert") where JobName='${jobName}'"""))
+            throw ex
+        }
+
+        qt_select_incremental """ SELECT * FROM ${dorisDstTable} ORDER BY id 
ASC """
+
+        sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+        // ── Case 2: multi-table merge (two PG tables → one Doris table) 
──────
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${pgSrcTable}"""
+            sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${pgSrcTable2}"""
+            sql """CREATE TABLE ${pgDB}.${pgSchema}.${pgSrcTable} (
+                      "id"   int,
+                      "name" varchar(200),
+                      PRIMARY KEY ("id")
+                    )"""
+            sql """CREATE TABLE ${pgDB}.${pgSchema}.${pgSrcTable2} (
+                      "id"   int,
+                      "name" varchar(200),
+                      PRIMARY KEY ("id")
+                    )"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${pgSrcTable}  VALUES (100, 
'Src1_A')"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${pgSrcTable2} VALUES (200, 
'Src2_A')"""
+        }
+
+        sql """CREATE JOB ${jobNameMerge}
+                ON STREAMING
+                FROM POSTGRES (
+                    "jdbc_url"       = 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+                    "driver_url"     = "${driver_url}",
+                    "driver_class"   = "org.postgresql.Driver",
+                    "user"           = "${pgUser}",
+                    "password"       = "${pgPassword}",
+                    "database"       = "${pgDB}",
+                    "schema"         = "${pgSchema}",
+                    "include_tables" = "${pgSrcTable},${pgSrcTable2}",
+                    "offset"         = "initial",
+                    "table.${pgSrcTable}.target_table"  = "${dorisMergeTable}",
+                    "table.${pgSrcTable2}.target_table" = "${dorisMergeTable}"
+                )
+                TO DATABASE ${currentDb} (
+                    "table.create.properties.replication_num" = "1"
+                )"""
+
+        // Wait for snapshot rows from both source tables
+        try {
+            Awaitility.await().atMost(300, SECONDS).pollInterval(2, 
SECONDS).until({
+                def ids = (sql """ SELECT id FROM ${dorisMergeTable} 
""").collect { it[0].toInteger() }
+                ids.contains(100) && ids.contains(200)
+            })
+        } catch (Exception ex) {
+            log.info("show job: " + (sql """select * from 
jobs("type"="insert") where Name='${jobNameMerge}'"""))
+            log.info("show task: " + (sql """select * from 
tasks("type"="insert") where JobName='${jobNameMerge}'"""))
+            throw ex
+        }
+
+        qt_select_merge_snapshot """ SELECT * FROM ${dorisMergeTable} ORDER BY 
id ASC """
+
+        // Incremental from both source tables
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${pgSrcTable}  VALUES (101, 
'Src1_B')"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${pgSrcTable2} VALUES (201, 
'Src2_B')"""
+        }
+        try {
+            Awaitility.await().atMost(120, SECONDS).pollInterval(2, 
SECONDS).until({
+                def ids = (sql """ SELECT id FROM ${dorisMergeTable} 
""").collect { it[0].toInteger() }
+                ids.contains(101) && ids.contains(201)
+            })
+        } catch (Exception ex) {
+            log.info("show job: " + (sql """select * from 
jobs("type"="insert") where Name='${jobNameMerge}'"""))
+            log.info("show task: " + (sql """select * from 
tasks("type"="insert") where JobName='${jobNameMerge}'"""))
+            throw ex
+        }
+
+        qt_select_merge_incremental """ SELECT * FROM ${dorisMergeTable} ORDER 
BY id ASC """
+
+        sql """DROP JOB IF EXISTS where jobname = '${jobNameMerge}'"""
+        def mergeJobCnt = sql """select count(1) from jobs("type"="insert") 
where Name = '${jobNameMerge}'"""
+        assert mergeJobCnt.get(0).get(0) == 0
+    }
+}

Review Comment:
   **Missing negative test cases.** Consider adding tests for error scenarios:
   1. `table.<src>.target_table` with an empty value — should be rejected by 
`DataSourceConfigValidator`
   2. A `target_table` mapping for a table not in `include_tables` — what 
happens? Should this be validated or silently ignored?
   3. Invalid target table name (e.g., containing special characters)
   
   These would strengthen confidence that the validation path works correctly 
end-to-end.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to