This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new ef260ab5e45 branch-4.1: [Improve](streaming job) support custom table 
name mapping for CDC streaming job  #61317 (#61605)
ef260ab5e45 is described below

commit ef260ab5e452f56ce5c5d0364d48418f11bc4c9e
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Mar 24 16:43:58 2026 +0800

    branch-4.1: [Improve](streaming job) support custom table name mapping for 
CDC streaming job  #61317 (#61605)
    
    Cherry-picked from #61317
    
    Co-authored-by: wudi <[email protected]>
---
 .../apache/doris/job/cdc/DataSourceConfigKeys.java |   1 +
 .../streaming/DataSourceConfigValidator.java       |   5 +
 .../insert/streaming/StreamingInsertJob.java       |  15 +-
 .../apache/doris/job/util/StreamingJobUtils.java   |  25 ++-
 .../cdcclient/service/PipelineCoordinator.java     |   8 +-
 .../deserialize/DebeziumJsonDeserializer.java      |  11 ++
 .../PostgresDebeziumJsonDeserializer.java          |   6 +-
 .../apache/doris/cdcclient/utils/ConfigUtil.java   |  25 +++
 .../test_streaming_postgres_job_table_mapping.out  |  19 ++
 ...est_streaming_postgres_job_table_mapping.groovy | 194 +++++++++++++++++++++
 10 files changed, 298 insertions(+), 11 deletions(-)

diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
index e3b441b4e3e..47ee5f21d27 100644
--- 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
+++ 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
@@ -42,6 +42,7 @@ public class DataSourceConfigKeys {
     // per-table config: key format is "table.<tableName>.<suffix>"
     public static final String TABLE = "table";
     public static final String TABLE_EXCLUDE_COLUMNS_SUFFIX = 
"exclude_columns";
+    public static final String TABLE_TARGET_TABLE_SUFFIX = "target_table";
 
     // target properties
     public static final String TABLE_PROPS_PREFIX = "table.create.properties.";
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
index 4e313796b62..b75e202b1a8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
@@ -45,6 +45,7 @@ public class DataSourceConfigValidator {
 
     // Known suffixes for per-table config keys (format: 
"table.<tableName>.<suffix>")
     private static final Set<String> ALLOW_TABLE_LEVEL_SUFFIXES = 
Sets.newHashSet(
+            DataSourceConfigKeys.TABLE_TARGET_TABLE_SUFFIX,
             DataSourceConfigKeys.TABLE_EXCLUDE_COLUMNS_SUFFIX
     );
 
@@ -67,6 +68,10 @@ public class DataSourceConfigValidator {
                 if (!ALLOW_TABLE_LEVEL_SUFFIXES.contains(suffix)) {
                     throw new IllegalArgumentException("Unknown per-table 
config key: '" + key + "'");
                 }
+                if (value == null || value.trim().isEmpty()) {
+                    throw new IllegalArgumentException(
+                            "Value for per-table config key '" + key + "' must 
not be empty");
+                }
                 continue;
             }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index ee3ee5dabe2..c999b9d99e3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -92,6 +92,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -255,15 +256,21 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
 
     private List<String> createTableIfNotExists() throws Exception {
         List<String> syncTbls = new ArrayList<>();
-        List<CreateTableCommand> createTblCmds = 
StreamingJobUtils.generateCreateTableCmds(targetDb,
-                dataSourceType, sourceProperties, targetProperties);
+        // Key: source table name (PG/MySQL); Value: CreateTableCommand for 
the Doris target table.
+        // The two names differ when "table.<src>.target_table" is configured.
+        LinkedHashMap<String, CreateTableCommand> createTblCmds =
+                StreamingJobUtils.generateCreateTableCmds(targetDb,
+                        dataSourceType, sourceProperties, targetProperties);
         Database db = 
Env.getCurrentEnv().getInternalCatalog().getDbNullable(targetDb);
         Preconditions.checkNotNull(db, "target database %s does not exist", 
targetDb);
-        for (CreateTableCommand createTblCmd : createTblCmds) {
+        for (Map.Entry<String, CreateTableCommand> entry : 
createTblCmds.entrySet()) {
+            String srcTable = entry.getKey();
+            CreateTableCommand createTblCmd = entry.getValue();
             if 
(!db.isTableExist(createTblCmd.getCreateTableInfo().getTableName())) {
                 createTblCmd.run(ConnectContext.get(), null);
             }
-            syncTbls.add(createTblCmd.getCreateTableInfo().getTableName());
+            // Use the source (upstream) table name so CDC monitors the 
correct PG/MySQL table
+            syncTbls.add(srcTable);
         }
         return syncTbls;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
index d2222dad383..9eec0061219 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
@@ -277,10 +277,20 @@ public class StreamingJobUtils {
         return newProps;
     }
 
-    public static List<CreateTableCommand> generateCreateTableCmds(String 
targetDb, DataSourceType sourceType,
+    /**
+     * Generate CREATE TABLE commands for the Doris target tables.
+     *
+     * <p>Returns a {@link LinkedHashMap} whose key is the <b>source</b> 
(upstream) table name and
+     * whose value is the corresponding {@link CreateTableCommand} that 
creates the Doris target
+     * table (which may have a different name when {@code 
table.<src>.target_table} is configured).
+     * Callers must use the map key as the PG/MySQL source table identifier 
for CDC monitoring and
+     * the {@link CreateTableCommand} value for the actual DDL execution.
+     */
+    public static LinkedHashMap<String, CreateTableCommand> 
generateCreateTableCmds(String targetDb,
+            DataSourceType sourceType,
             Map<String, String> properties, Map<String, String> 
targetProperties)
             throws JobException {
-        List<CreateTableCommand> createtblCmds = new ArrayList<>();
+        LinkedHashMap<String, CreateTableCommand> createtblCmds = new 
LinkedHashMap<>();
         String includeTables = 
properties.get(DataSourceConfigKeys.INCLUDE_TABLES);
         String excludeTables = 
properties.get(DataSourceConfigKeys.EXCLUDE_TABLES);
         List<String> includeTablesList = new ArrayList<>();
@@ -322,6 +332,12 @@ public class StreamingJobUtils {
                 noPrimaryKeyTables.add(table);
             }
 
+            // Resolve target (Doris) table name; defaults to source table 
name if not configured
+            String targetTableName = properties.getOrDefault(
+                    DataSourceConfigKeys.TABLE + "." + table + "."
+                            + DataSourceConfigKeys.TABLE_TARGET_TABLE_SUFFIX,
+                    table).trim();
+
             // Validate and apply exclude_columns for this table
             Set<String> excludeColumns = parseExcludeColumns(properties, 
table);
             if (!excludeColumns.isEmpty()) {
@@ -352,7 +368,7 @@ public class StreamingJobUtils {
                     false, // isTemp
                     InternalCatalog.INTERNAL_CATALOG_NAME, // ctlName
                     targetDb, // dbName
-                    table, // tableName
+                    targetTableName, // tableName
                     columnDefinitions, // columns
                     ImmutableList.of(), // indexes
                     "olap", // engineName
@@ -367,7 +383,8 @@ public class StreamingJobUtils {
                     ImmutableList.of() // clusterKeyColumnNames
             );
             CreateTableCommand createtblCmd = new 
CreateTableCommand(Optional.empty(), createtblInfo);
-            createtblCmds.add(createtblCmd);
+            // Key: source (PG/MySQL) table name; Value: command that creates 
the Doris target table
+            createtblCmds.put(table, createtblCmd);
         }
         if (createtblCmds.isEmpty()) {
             throw new JobException("Can not found match table in database " + 
database);
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
index 97aa4b7f5f2..414a1d23797 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
@@ -24,6 +24,7 @@ import org.apache.doris.cdcclient.sink.DorisBatchStreamLoad;
 import org.apache.doris.cdcclient.source.deserialize.DeserializeResult;
 import org.apache.doris.cdcclient.source.reader.SourceReader;
 import org.apache.doris.cdcclient.source.reader.SplitReadResult;
+import org.apache.doris.cdcclient.utils.ConfigUtil;
 import org.apache.doris.cdcclient.utils.SchemaChangeManager;
 import org.apache.doris.job.cdc.request.FetchRecordRequest;
 import org.apache.doris.job.cdc.request.WriteRecordRequest;
@@ -249,6 +250,10 @@ public class PipelineCoordinator {
         Map<String, String> deserializeContext = new 
HashMap<>(writeRecordRequest.getConfig());
         deserializeContext.put(Constants.DORIS_TARGET_DB, targetDb);
 
+        // Pre-parse source->target table name mappings once for this request
+        Map<String, String> targetTableMappings =
+                
ConfigUtil.parseAllTargetTableMappings(writeRecordRequest.getConfig());
+
         SourceReader sourceReader = 
Env.getCurrentEnv().getReader(writeRecordRequest);
         DorisBatchStreamLoad batchStreamLoad = null;
         long scannedRows = 0L;
@@ -338,9 +343,10 @@ public class PipelineCoordinator {
                     }
                     if (!CollectionUtils.isEmpty(result.getRecords())) {
                         String table = extractTable(element);
+                        String dorisTable = 
targetTableMappings.getOrDefault(table, table);
                         for (String record : result.getRecords()) {
                             scannedRows++;
-                            batchStreamLoad.writeRecord(targetDb, table, 
record.getBytes());
+                            batchStreamLoad.writeRecord(targetDb, dorisTable, 
record.getBytes());
                         }
                         // Mark last message as data (not heartbeat)
                         lastMessageIsHeartbeat = false;
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
index 25b2b544893..7876597660e 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
@@ -82,6 +82,8 @@ public class DebeziumJsonDeserializer
     private static ObjectMapper objectMapper = new ObjectMapper();
     @Setter private ZoneId serverTimeZone = ZoneId.systemDefault();
     @Getter @Setter protected Map<TableId, TableChanges.TableChange> 
tableSchemas;
+    // Parsed source->target table name mappings, populated once in init() 
from config
+    protected Map<String, String> targetTableMappingsCache = new HashMap<>();
     // Parsed exclude-column sets per table, populated once in init() from 
config
     protected Map<String, Set<String>> excludeColumnsCache = new HashMap<>();
 
@@ -91,9 +93,18 @@ public class DebeziumJsonDeserializer
     public void init(Map<String, String> props) {
         this.serverTimeZone =
                 
ConfigUtil.getServerTimeZoneFromJdbcUrl(props.get(DataSourceConfigKeys.JDBC_URL));
+        targetTableMappingsCache = 
ConfigUtil.parseAllTargetTableMappings(props);
         excludeColumnsCache = ConfigUtil.parseAllExcludeColumns(props);
     }
 
+    /**
+     * Resolve the Doris target table name for a given upstream (PG) source 
table name. Returns the
+     * mapped name if configured, otherwise returns the source name unchanged.
+     */
+    protected String resolveTargetTable(String srcTable) {
+        return targetTableMappingsCache.getOrDefault(srcTable, srcTable);
+    }
+
     @Override
     public DeserializeResult deserialize(Map<String, String> context, 
SourceRecord record)
             throws IOException {
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/PostgresDebeziumJsonDeserializer.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/PostgresDebeziumJsonDeserializer.java
index aa1a6e9c7bd..85fdb1ddea7 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/PostgresDebeziumJsonDeserializer.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/PostgresDebeziumJsonDeserializer.java
@@ -218,7 +218,9 @@ public class PostgresDebeziumJsonDeserializer extends 
DebeziumJsonDeserializer {
                         colName);
                 continue;
             }
-            ddls.add(SchemaChangeHelper.buildDropColumnSql(db, 
tableId.table(), colName));
+            ddls.add(
+                    SchemaChangeHelper.buildDropColumnSql(
+                            db, resolveTargetTable(tableId.table()), colName));
         }
 
         for (Column col : pgAdded) {
@@ -243,7 +245,7 @@ public class PostgresDebeziumJsonDeserializer extends 
DebeziumJsonDeserializer {
             ddls.add(
                     SchemaChangeHelper.buildAddColumnSql(
                             db,
-                            tableId.table(),
+                            resolveTargetTable(tableId.table()),
                             col.name(),
                             colType + nullable,
                             defaultObj != null ? String.valueOf(defaultObj) : 
null,
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java
index 56d9aeac53e..46d581f58e5 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java
@@ -169,6 +169,31 @@ public class ConfigUtil {
         return result;
     }
 
+    /**
+     * Parse all target-table name mappings from config.
+     *
+     * <p>Scans all keys matching {@code "table.<srcTableName>.target_table"} 
and returns a map from
+     * source table name to target (Doris) table name. Tables without a 
mapping are NOT included;
+     * callers should use {@code getOrDefault(srcTable, srcTable)}.
+     */
+    public static Map<String, String> parseAllTargetTableMappings(Map<String, 
String> config) {
+        String prefix = DataSourceConfigKeys.TABLE + ".";
+        String suffix = "." + DataSourceConfigKeys.TABLE_TARGET_TABLE_SUFFIX;
+        Map<String, String> result = new HashMap<>();
+        for (Map.Entry<String, String> entry : config.entrySet()) {
+            String key = entry.getKey();
+            if (key.startsWith(prefix) && key.endsWith(suffix)) {
+                String srcTable = key.substring(prefix.length(), key.length() 
- suffix.length());
+                String rawValue = entry.getValue();
+                String dstTable = rawValue != null ? rawValue.trim() : "";
+                if (!srcTable.isEmpty() && !dstTable.isEmpty()) {
+                    result.put(srcTable, dstTable);
+                }
+            }
+        }
+        return result;
+    }
+
     public static Map<String, String> toStringMap(String json) {
         if (!isJson(json)) {
             return null;
diff --git 
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_table_mapping.out
 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_table_mapping.out
new file mode 100644
index 00000000000..8d922a718f1
--- /dev/null
+++ 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_table_mapping.out
@@ -0,0 +1,19 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select_snapshot --
+1      Alice
+2      Bob
+
+-- !select_incremental --
+2      Bob_v2
+3      Carol
+
+-- !select_merge_snapshot --
+100    Src1_A
+200    Src2_A
+
+-- !select_merge_incremental --
+100    Src1_A
+101    Src1_B
+200    Src2_A
+201    Src2_B
+
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_table_mapping.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_table_mapping.groovy
new file mode 100644
index 00000000000..b31805e682a
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_table_mapping.groovy
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_postgres_job_table_mapping", 
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+    def jobName         = "test_streaming_pg_table_mapping"
+    def jobNameMerge    = "test_streaming_pg_table_mapping_merge"
+    def currentDb       = (sql "select database()")[0][0]
+    def pgSrcTable      = "pg_src_table"        // upstream PG table name
+    def dorisDstTable   = "doris_dst_table"     // downstream Doris table name 
(mapped)
+    def pgSrcTable2     = "pg_src_table2"       // second upstream table 
(multi-table merge)
+    def dorisMergeTable = "doris_merge_table"   // both PG tables merge into 
this Doris table
+    def pgDB            = "postgres"
+    def pgSchema        = "cdc_test"
+    def pgUser          = "postgres"
+    def pgPassword      = "123456"
+
+    // Cleanup
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    sql """DROP JOB IF EXISTS where jobname = '${jobNameMerge}'"""
+    sql """drop table if exists ${currentDb}.${dorisDstTable} force"""
+    sql """drop table if exists ${currentDb}.${dorisMergeTable} force"""
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String pg_port       = context.config.otherConfigs.get("pg_14_port")
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint   = getS3Endpoint()
+        String bucket        = getS3BucketName()
+        String driver_url    = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar";
+
+        // ── Case 1: basic table name mapping 
─────────────────────────────────
+        // PG table: pg_src_table → Doris table: doris_dst_table
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${pgSrcTable}"""
+            sql """CREATE TABLE ${pgDB}.${pgSchema}.${pgSrcTable} (
+                      "id"   int,
+                      "name" varchar(200),
+                      PRIMARY KEY ("id")
+                    )"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${pgSrcTable} VALUES (1, 
'Alice')"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${pgSrcTable} VALUES (2, 
'Bob')"""
+        }
+
+        sql """CREATE JOB ${jobName}
+                ON STREAMING
+                FROM POSTGRES (
+                    "jdbc_url"       = 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+                    "driver_url"     = "${driver_url}",
+                    "driver_class"   = "org.postgresql.Driver",
+                    "user"           = "${pgUser}",
+                    "password"       = "${pgPassword}",
+                    "database"       = "${pgDB}",
+                    "schema"         = "${pgSchema}",
+                    "include_tables" = "${pgSrcTable}",
+                    "offset"         = "initial",
+                    "table.${pgSrcTable}.target_table" = "${dorisDstTable}"
+                )
+                TO DATABASE ${currentDb} (
+                    "table.create.properties.replication_num" = "1"
+                )"""
+
+        // Verify the Doris table was created with the mapped name, not the 
source name
+        def tables = (sql """show tables from ${currentDb}""").collect { it[0] 
}
+        assert tables.contains(dorisDstTable) : "Doris target table 
'${dorisDstTable}' should exist"
+        assert !tables.contains(pgSrcTable)   : "Source table name 
'${pgSrcTable}' must NOT exist in Doris"
+
+        // Wait for snapshot
+        try {
+            Awaitility.await().atMost(300, SECONDS).pollInterval(1, 
SECONDS).until({
+                def cnt = sql """select SucceedTaskCount from 
jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING'"""
+                cnt.size() == 1 && cnt.get(0).get(0).toLong() >= 2
+            })
+        } catch (Exception ex) {
+            log.info("show job: " + (sql """select * from 
jobs("type"="insert") where Name='${jobName}'"""))
+            log.info("show task: " + (sql """select * from 
tasks("type"="insert") where JobName='${jobName}'"""))
+            throw ex
+        }
+
+        qt_select_snapshot """ SELECT * FROM ${dorisDstTable} ORDER BY id ASC 
"""
+
+        // Incremental: INSERT / UPDATE / DELETE must all land in 
doris_dst_table
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${pgSrcTable} VALUES (3, 
'Carol')"""
+            sql """UPDATE ${pgDB}.${pgSchema}.${pgSrcTable} SET name = 
'Bob_v2' WHERE id = 2"""
+            sql """DELETE FROM ${pgDB}.${pgSchema}.${pgSrcTable} WHERE id = 
1"""
+        }
+        try {
+            Awaitility.await().atMost(120, SECONDS).pollInterval(2, 
SECONDS).until({
+                def ids = (sql """ SELECT id FROM ${dorisDstTable} ORDER BY id 
ASC """).collect { it[0].toInteger() }
+                ids.contains(3) && !ids.contains(1)
+            })
+        } catch (Exception ex) {
+            log.info("show job: " + (sql """select * from 
jobs("type"="insert") where Name='${jobName}'"""))
+            log.info("show task: " + (sql """select * from 
tasks("type"="insert") where JobName='${jobName}'"""))
+            throw ex
+        }
+
+        qt_select_incremental """ SELECT * FROM ${dorisDstTable} ORDER BY id 
ASC """
+
+        sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+        // ── Case 2: multi-table merge (two PG tables → one Doris table) 
──────
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${pgSrcTable}"""
+            sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${pgSrcTable2}"""
+            sql """CREATE TABLE ${pgDB}.${pgSchema}.${pgSrcTable} (
+                      "id"   int,
+                      "name" varchar(200),
+                      PRIMARY KEY ("id")
+                    )"""
+            sql """CREATE TABLE ${pgDB}.${pgSchema}.${pgSrcTable2} (
+                      "id"   int,
+                      "name" varchar(200),
+                      PRIMARY KEY ("id")
+                    )"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${pgSrcTable}  VALUES (100, 
'Src1_A')"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${pgSrcTable2} VALUES (200, 
'Src2_A')"""
+        }
+
+        sql """CREATE JOB ${jobNameMerge}
+                ON STREAMING
+                FROM POSTGRES (
+                    "jdbc_url"       = 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+                    "driver_url"     = "${driver_url}",
+                    "driver_class"   = "org.postgresql.Driver",
+                    "user"           = "${pgUser}",
+                    "password"       = "${pgPassword}",
+                    "database"       = "${pgDB}",
+                    "schema"         = "${pgSchema}",
+                    "include_tables" = "${pgSrcTable},${pgSrcTable2}",
+                    "offset"         = "initial",
+                    "table.${pgSrcTable}.target_table"  = "${dorisMergeTable}",
+                    "table.${pgSrcTable2}.target_table" = "${dorisMergeTable}"
+                )
+                TO DATABASE ${currentDb} (
+                    "table.create.properties.replication_num" = "1"
+                )"""
+
+        // Wait for snapshot rows from both source tables
+        try {
+            Awaitility.await().atMost(300, SECONDS).pollInterval(2, 
SECONDS).until({
+                def ids = (sql """ SELECT id FROM ${dorisMergeTable} 
""").collect { it[0].toInteger() }
+                ids.contains(100) && ids.contains(200)
+            })
+        } catch (Exception ex) {
+            log.info("show job: " + (sql """select * from 
jobs("type"="insert") where Name='${jobNameMerge}'"""))
+            log.info("show task: " + (sql """select * from 
tasks("type"="insert") where JobName='${jobNameMerge}'"""))
+            throw ex
+        }
+
+        qt_select_merge_snapshot """ SELECT * FROM ${dorisMergeTable} ORDER BY 
id ASC """
+
+        // Incremental from both source tables
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${pgSrcTable}  VALUES (101, 
'Src1_B')"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${pgSrcTable2} VALUES (201, 
'Src2_B')"""
+        }
+        try {
+            Awaitility.await().atMost(120, SECONDS).pollInterval(2, 
SECONDS).until({
+                def ids = (sql """ SELECT id FROM ${dorisMergeTable} 
""").collect { it[0].toInteger() }
+                ids.contains(101) && ids.contains(201)
+            })
+        } catch (Exception ex) {
+            log.info("show job: " + (sql """select * from 
jobs("type"="insert") where Name='${jobNameMerge}'"""))
+            log.info("show task: " + (sql """select * from 
tasks("type"="insert") where JobName='${jobNameMerge}'"""))
+            throw ex
+        }
+
+        qt_select_merge_incremental """ SELECT * FROM ${dorisMergeTable} ORDER 
BY id ASC """
+
+        sql """DROP JOB IF EXISTS where jobname = '${jobNameMerge}'"""
+        def mergeJobCnt = sql """select count(1) from jobs("type"="insert") 
where Name = '${jobNameMerge}'"""
+        assert mergeJobCnt.get(0).get(0) == 0
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to