This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new ef260ab5e45 branch-4.1: [Improve](streaming job) support custom table
name mapping for CDC streaming job #61317 (#61605)
ef260ab5e45 is described below
commit ef260ab5e452f56ce5c5d0364d48418f11bc4c9e
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Mar 24 16:43:58 2026 +0800
branch-4.1: [Improve](streaming job) support custom table name mapping for
CDC streaming job #61317 (#61605)
Cherry-picked from #61317
Co-authored-by: wudi <[email protected]>
---
.../apache/doris/job/cdc/DataSourceConfigKeys.java | 1 +
.../streaming/DataSourceConfigValidator.java | 5 +
.../insert/streaming/StreamingInsertJob.java | 15 +-
.../apache/doris/job/util/StreamingJobUtils.java | 25 ++-
.../cdcclient/service/PipelineCoordinator.java | 8 +-
.../deserialize/DebeziumJsonDeserializer.java | 11 ++
.../PostgresDebeziumJsonDeserializer.java | 6 +-
.../apache/doris/cdcclient/utils/ConfigUtil.java | 25 +++
.../test_streaming_postgres_job_table_mapping.out | 19 ++
...est_streaming_postgres_job_table_mapping.groovy | 194 +++++++++++++++++++++
10 files changed, 298 insertions(+), 11 deletions(-)
diff --git
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
index e3b441b4e3e..47ee5f21d27 100644
---
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
+++
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
@@ -42,6 +42,7 @@ public class DataSourceConfigKeys {
// per-table config: key format is "table.<tableName>.<suffix>"
public static final String TABLE = "table";
public static final String TABLE_EXCLUDE_COLUMNS_SUFFIX =
"exclude_columns";
+ public static final String TABLE_TARGET_TABLE_SUFFIX = "target_table";
// target properties
public static final String TABLE_PROPS_PREFIX = "table.create.properties.";
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
index 4e313796b62..b75e202b1a8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
@@ -45,6 +45,7 @@ public class DataSourceConfigValidator {
// Known suffixes for per-table config keys (format:
"table.<tableName>.<suffix>")
private static final Set<String> ALLOW_TABLE_LEVEL_SUFFIXES =
Sets.newHashSet(
+ DataSourceConfigKeys.TABLE_TARGET_TABLE_SUFFIX,
DataSourceConfigKeys.TABLE_EXCLUDE_COLUMNS_SUFFIX
);
@@ -67,6 +68,10 @@ public class DataSourceConfigValidator {
if (!ALLOW_TABLE_LEVEL_SUFFIXES.contains(suffix)) {
throw new IllegalArgumentException("Unknown per-table
config key: '" + key + "'");
}
+ if (value == null || value.trim().isEmpty()) {
+ throw new IllegalArgumentException(
+ "Value for per-table config key '" + key + "' must
not be empty");
+ }
continue;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index ee3ee5dabe2..c999b9d99e3 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -92,6 +92,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
@@ -255,15 +256,21 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
private List<String> createTableIfNotExists() throws Exception {
List<String> syncTbls = new ArrayList<>();
- List<CreateTableCommand> createTblCmds =
StreamingJobUtils.generateCreateTableCmds(targetDb,
- dataSourceType, sourceProperties, targetProperties);
+ // Key: source table name (PG/MySQL); Value: CreateTableCommand for
the Doris target table.
+ // The two names differ when "table.<src>.target_table" is configured.
+ LinkedHashMap<String, CreateTableCommand> createTblCmds =
+ StreamingJobUtils.generateCreateTableCmds(targetDb,
+ dataSourceType, sourceProperties, targetProperties);
Database db =
Env.getCurrentEnv().getInternalCatalog().getDbNullable(targetDb);
Preconditions.checkNotNull(db, "target database %s does not exist",
targetDb);
- for (CreateTableCommand createTblCmd : createTblCmds) {
+ for (Map.Entry<String, CreateTableCommand> entry :
createTblCmds.entrySet()) {
+ String srcTable = entry.getKey();
+ CreateTableCommand createTblCmd = entry.getValue();
if
(!db.isTableExist(createTblCmd.getCreateTableInfo().getTableName())) {
createTblCmd.run(ConnectContext.get(), null);
}
- syncTbls.add(createTblCmd.getCreateTableInfo().getTableName());
+ // Use the source (upstream) table name so CDC monitors the
correct PG/MySQL table
+ syncTbls.add(srcTable);
}
return syncTbls;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
index d2222dad383..9eec0061219 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
@@ -277,10 +277,20 @@ public class StreamingJobUtils {
return newProps;
}
- public static List<CreateTableCommand> generateCreateTableCmds(String
targetDb, DataSourceType sourceType,
+ /**
+ * Generate CREATE TABLE commands for the Doris target tables.
+ *
+ * <p>Returns a {@link LinkedHashMap} whose key is the <b>source</b>
(upstream) table name and
+ * whose value is the corresponding {@link CreateTableCommand} that
creates the Doris target
+ * table (which may have a different name when {@code
table.<src>.target_table} is configured).
+ * Callers must use the map key as the PG/MySQL source table identifier
for CDC monitoring and
+ * the {@link CreateTableCommand} value for the actual DDL execution.
+ */
+ public static LinkedHashMap<String, CreateTableCommand>
generateCreateTableCmds(String targetDb,
+ DataSourceType sourceType,
Map<String, String> properties, Map<String, String>
targetProperties)
throws JobException {
- List<CreateTableCommand> createtblCmds = new ArrayList<>();
+ LinkedHashMap<String, CreateTableCommand> createtblCmds = new
LinkedHashMap<>();
String includeTables =
properties.get(DataSourceConfigKeys.INCLUDE_TABLES);
String excludeTables =
properties.get(DataSourceConfigKeys.EXCLUDE_TABLES);
List<String> includeTablesList = new ArrayList<>();
@@ -322,6 +332,12 @@ public class StreamingJobUtils {
noPrimaryKeyTables.add(table);
}
+ // Resolve target (Doris) table name; defaults to source table
name if not configured
+ String targetTableName = properties.getOrDefault(
+ DataSourceConfigKeys.TABLE + "." + table + "."
+ + DataSourceConfigKeys.TABLE_TARGET_TABLE_SUFFIX,
+ table).trim();
+
// Validate and apply exclude_columns for this table
Set<String> excludeColumns = parseExcludeColumns(properties,
table);
if (!excludeColumns.isEmpty()) {
@@ -352,7 +368,7 @@ public class StreamingJobUtils {
false, // isTemp
InternalCatalog.INTERNAL_CATALOG_NAME, // ctlName
targetDb, // dbName
- table, // tableName
+ targetTableName, // tableName
columnDefinitions, // columns
ImmutableList.of(), // indexes
"olap", // engineName
@@ -367,7 +383,8 @@ public class StreamingJobUtils {
ImmutableList.of() // clusterKeyColumnNames
);
CreateTableCommand createtblCmd = new
CreateTableCommand(Optional.empty(), createtblInfo);
- createtblCmds.add(createtblCmd);
+ // Key: source (PG/MySQL) table name; Value: command that creates
the Doris target table
+ createtblCmds.put(table, createtblCmd);
}
if (createtblCmds.isEmpty()) {
throw new JobException("Can not found match table in database " +
database);
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
index 97aa4b7f5f2..414a1d23797 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
@@ -24,6 +24,7 @@ import org.apache.doris.cdcclient.sink.DorisBatchStreamLoad;
import org.apache.doris.cdcclient.source.deserialize.DeserializeResult;
import org.apache.doris.cdcclient.source.reader.SourceReader;
import org.apache.doris.cdcclient.source.reader.SplitReadResult;
+import org.apache.doris.cdcclient.utils.ConfigUtil;
import org.apache.doris.cdcclient.utils.SchemaChangeManager;
import org.apache.doris.job.cdc.request.FetchRecordRequest;
import org.apache.doris.job.cdc.request.WriteRecordRequest;
@@ -249,6 +250,10 @@ public class PipelineCoordinator {
Map<String, String> deserializeContext = new
HashMap<>(writeRecordRequest.getConfig());
deserializeContext.put(Constants.DORIS_TARGET_DB, targetDb);
+ // Pre-parse source->target table name mappings once for this request
+ Map<String, String> targetTableMappings =
+
ConfigUtil.parseAllTargetTableMappings(writeRecordRequest.getConfig());
+
SourceReader sourceReader =
Env.getCurrentEnv().getReader(writeRecordRequest);
DorisBatchStreamLoad batchStreamLoad = null;
long scannedRows = 0L;
@@ -338,9 +343,10 @@ public class PipelineCoordinator {
}
if (!CollectionUtils.isEmpty(result.getRecords())) {
String table = extractTable(element);
+ String dorisTable =
targetTableMappings.getOrDefault(table, table);
for (String record : result.getRecords()) {
scannedRows++;
- batchStreamLoad.writeRecord(targetDb, table,
record.getBytes());
+ batchStreamLoad.writeRecord(targetDb, dorisTable,
record.getBytes());
}
// Mark last message as data (not heartbeat)
lastMessageIsHeartbeat = false;
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
index 25b2b544893..7876597660e 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
@@ -82,6 +82,8 @@ public class DebeziumJsonDeserializer
private static ObjectMapper objectMapper = new ObjectMapper();
@Setter private ZoneId serverTimeZone = ZoneId.systemDefault();
@Getter @Setter protected Map<TableId, TableChanges.TableChange>
tableSchemas;
+ // Parsed source->target table name mappings, populated once in init()
from config
+ protected Map<String, String> targetTableMappingsCache = new HashMap<>();
// Parsed exclude-column sets per table, populated once in init() from
config
protected Map<String, Set<String>> excludeColumnsCache = new HashMap<>();
@@ -91,9 +93,18 @@ public class DebeziumJsonDeserializer
public void init(Map<String, String> props) {
this.serverTimeZone =
ConfigUtil.getServerTimeZoneFromJdbcUrl(props.get(DataSourceConfigKeys.JDBC_URL));
+ targetTableMappingsCache =
ConfigUtil.parseAllTargetTableMappings(props);
excludeColumnsCache = ConfigUtil.parseAllExcludeColumns(props);
}
+ /**
+ * Resolve the Doris target table name for a given upstream (PG) source
table name. Returns the
+ * mapped name if configured, otherwise returns the source name unchanged.
+ */
+ protected String resolveTargetTable(String srcTable) {
+ return targetTableMappingsCache.getOrDefault(srcTable, srcTable);
+ }
+
@Override
public DeserializeResult deserialize(Map<String, String> context,
SourceRecord record)
throws IOException {
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/PostgresDebeziumJsonDeserializer.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/PostgresDebeziumJsonDeserializer.java
index aa1a6e9c7bd..85fdb1ddea7 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/PostgresDebeziumJsonDeserializer.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/PostgresDebeziumJsonDeserializer.java
@@ -218,7 +218,9 @@ public class PostgresDebeziumJsonDeserializer extends
DebeziumJsonDeserializer {
colName);
continue;
}
- ddls.add(SchemaChangeHelper.buildDropColumnSql(db,
tableId.table(), colName));
+ ddls.add(
+ SchemaChangeHelper.buildDropColumnSql(
+ db, resolveTargetTable(tableId.table()), colName));
}
for (Column col : pgAdded) {
@@ -243,7 +245,7 @@ public class PostgresDebeziumJsonDeserializer extends
DebeziumJsonDeserializer {
ddls.add(
SchemaChangeHelper.buildAddColumnSql(
db,
- tableId.table(),
+ resolveTargetTable(tableId.table()),
col.name(),
colType + nullable,
defaultObj != null ? String.valueOf(defaultObj) :
null,
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java
index 56d9aeac53e..46d581f58e5 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java
@@ -169,6 +169,31 @@ public class ConfigUtil {
return result;
}
+ /**
+ * Parse all target-table name mappings from config.
+ *
+ * <p>Scans all keys matching {@code "table.<srcTableName>.target_table"}
and returns a map from
+ * source table name to target (Doris) table name. Tables without a
mapping are NOT included;
+ * callers should use {@code getOrDefault(srcTable, srcTable)}.
+ */
+ public static Map<String, String> parseAllTargetTableMappings(Map<String,
String> config) {
+ String prefix = DataSourceConfigKeys.TABLE + ".";
+ String suffix = "." + DataSourceConfigKeys.TABLE_TARGET_TABLE_SUFFIX;
+ Map<String, String> result = new HashMap<>();
+ for (Map.Entry<String, String> entry : config.entrySet()) {
+ String key = entry.getKey();
+ if (key.startsWith(prefix) && key.endsWith(suffix)) {
+ String srcTable = key.substring(prefix.length(), key.length()
- suffix.length());
+ String rawValue = entry.getValue();
+ String dstTable = rawValue != null ? rawValue.trim() : "";
+ if (!srcTable.isEmpty() && !dstTable.isEmpty()) {
+ result.put(srcTable, dstTable);
+ }
+ }
+ }
+ return result;
+ }
+
public static Map<String, String> toStringMap(String json) {
if (!isJson(json)) {
return null;
diff --git
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_table_mapping.out
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_table_mapping.out
new file mode 100644
index 00000000000..8d922a718f1
--- /dev/null
+++
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_table_mapping.out
@@ -0,0 +1,19 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select_snapshot --
+1 Alice
+2 Bob
+
+-- !select_incremental --
+2 Bob_v2
+3 Carol
+
+-- !select_merge_snapshot --
+100 Src1_A
+200 Src2_A
+
+-- !select_merge_incremental --
+100 Src1_A
+101 Src1_B
+200 Src2_A
+201 Src2_B
+
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_table_mapping.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_table_mapping.groovy
new file mode 100644
index 00000000000..b31805e682a
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_table_mapping.groovy
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_postgres_job_table_mapping",
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+ def jobName = "test_streaming_pg_table_mapping"
+ def jobNameMerge = "test_streaming_pg_table_mapping_merge"
+ def currentDb = (sql "select database()")[0][0]
+ def pgSrcTable = "pg_src_table" // upstream PG table name
+ def dorisDstTable = "doris_dst_table" // downstream Doris table name
(mapped)
+ def pgSrcTable2 = "pg_src_table2" // second upstream table
(multi-table merge)
+ def dorisMergeTable = "doris_merge_table" // both PG tables merge into
this Doris table
+ def pgDB = "postgres"
+ def pgSchema = "cdc_test"
+ def pgUser = "postgres"
+ def pgPassword = "123456"
+
+ // Cleanup
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """DROP JOB IF EXISTS where jobname = '${jobNameMerge}'"""
+ sql """drop table if exists ${currentDb}.${dorisDstTable} force"""
+ sql """drop table if exists ${currentDb}.${dorisMergeTable} force"""
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String pg_port = context.config.otherConfigs.get("pg_14_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar"
+
+ // ── Case 1: basic table name mapping
─────────────────────────────────
+ // PG table: pg_src_table → Doris table: doris_dst_table
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${pgSrcTable}"""
+ sql """CREATE TABLE ${pgDB}.${pgSchema}.${pgSrcTable} (
+ "id" int,
+ "name" varchar(200),
+ PRIMARY KEY ("id")
+ )"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${pgSrcTable} VALUES (1,
'Alice')"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${pgSrcTable} VALUES (2,
'Bob')"""
+ }
+
+ sql """CREATE JOB ${jobName}
+ ON STREAMING
+ FROM POSTGRES (
+ "jdbc_url" =
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "org.postgresql.Driver",
+ "user" = "${pgUser}",
+ "password" = "${pgPassword}",
+ "database" = "${pgDB}",
+ "schema" = "${pgSchema}",
+ "include_tables" = "${pgSrcTable}",
+ "offset" = "initial",
+ "table.${pgSrcTable}.target_table" = "${dorisDstTable}"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )"""
+
+ // Verify the Doris table was created with the mapped name, not the
source name
+ def tables = (sql """show tables from ${currentDb}""").collect { it[0]
}
+ assert tables.contains(dorisDstTable) : "Doris target table
'${dorisDstTable}' should exist"
+ assert !tables.contains(pgSrcTable) : "Source table name
'${pgSrcTable}' must NOT exist in Doris"
+
+ // Wait for snapshot
+ try {
+ Awaitility.await().atMost(300, SECONDS).pollInterval(1,
SECONDS).until({
+ def cnt = sql """select SucceedTaskCount from
jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING'"""
+ cnt.size() == 1 && cnt.get(0).get(0).toLong() >= 2
+ })
+ } catch (Exception ex) {
+ log.info("show job: " + (sql """select * from
jobs("type"="insert") where Name='${jobName}'"""))
+ log.info("show task: " + (sql """select * from
tasks("type"="insert") where JobName='${jobName}'"""))
+ throw ex
+ }
+
+ qt_select_snapshot """ SELECT * FROM ${dorisDstTable} ORDER BY id ASC
"""
+
+ // Incremental: INSERT / UPDATE / DELETE must all land in
doris_dst_table
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${pgSrcTable} VALUES (3,
'Carol')"""
+ sql """UPDATE ${pgDB}.${pgSchema}.${pgSrcTable} SET name =
'Bob_v2' WHERE id = 2"""
+ sql """DELETE FROM ${pgDB}.${pgSchema}.${pgSrcTable} WHERE id =
1"""
+ }
+ try {
+ Awaitility.await().atMost(120, SECONDS).pollInterval(2,
SECONDS).until({
+ def ids = (sql """ SELECT id FROM ${dorisDstTable} ORDER BY id
ASC """).collect { it[0].toInteger() }
+ ids.contains(3) && !ids.contains(1)
+ })
+ } catch (Exception ex) {
+ log.info("show job: " + (sql """select * from
jobs("type"="insert") where Name='${jobName}'"""))
+ log.info("show task: " + (sql """select * from
tasks("type"="insert") where JobName='${jobName}'"""))
+ throw ex
+ }
+
+ qt_select_incremental """ SELECT * FROM ${dorisDstTable} ORDER BY id
ASC """
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+ // ── Case 2: multi-table merge (two PG tables → one Doris table)
──────
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${pgSrcTable}"""
+ sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${pgSrcTable2}"""
+ sql """CREATE TABLE ${pgDB}.${pgSchema}.${pgSrcTable} (
+ "id" int,
+ "name" varchar(200),
+ PRIMARY KEY ("id")
+ )"""
+ sql """CREATE TABLE ${pgDB}.${pgSchema}.${pgSrcTable2} (
+ "id" int,
+ "name" varchar(200),
+ PRIMARY KEY ("id")
+ )"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${pgSrcTable} VALUES (100,
'Src1_A')"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${pgSrcTable2} VALUES (200,
'Src2_A')"""
+ }
+
+ sql """CREATE JOB ${jobNameMerge}
+ ON STREAMING
+ FROM POSTGRES (
+ "jdbc_url" =
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "org.postgresql.Driver",
+ "user" = "${pgUser}",
+ "password" = "${pgPassword}",
+ "database" = "${pgDB}",
+ "schema" = "${pgSchema}",
+ "include_tables" = "${pgSrcTable},${pgSrcTable2}",
+ "offset" = "initial",
+ "table.${pgSrcTable}.target_table" = "${dorisMergeTable}",
+ "table.${pgSrcTable2}.target_table" = "${dorisMergeTable}"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )"""
+
+ // Wait for snapshot rows from both source tables
+ try {
+ Awaitility.await().atMost(300, SECONDS).pollInterval(2,
SECONDS).until({
+ def ids = (sql """ SELECT id FROM ${dorisMergeTable}
""").collect { it[0].toInteger() }
+ ids.contains(100) && ids.contains(200)
+ })
+ } catch (Exception ex) {
+ log.info("show job: " + (sql """select * from
jobs("type"="insert") where Name='${jobNameMerge}'"""))
+ log.info("show task: " + (sql """select * from
tasks("type"="insert") where JobName='${jobNameMerge}'"""))
+ throw ex
+ }
+
+ qt_select_merge_snapshot """ SELECT * FROM ${dorisMergeTable} ORDER BY
id ASC """
+
+ // Incremental from both source tables
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${pgSrcTable} VALUES (101,
'Src1_B')"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${pgSrcTable2} VALUES (201,
'Src2_B')"""
+ }
+ try {
+ Awaitility.await().atMost(120, SECONDS).pollInterval(2,
SECONDS).until({
+ def ids = (sql """ SELECT id FROM ${dorisMergeTable}
""").collect { it[0].toInteger() }
+ ids.contains(101) && ids.contains(201)
+ })
+ } catch (Exception ex) {
+ log.info("show job: " + (sql """select * from
jobs("type"="insert") where Name='${jobNameMerge}'"""))
+ log.info("show task: " + (sql """select * from
tasks("type"="insert") where JobName='${jobNameMerge}'"""))
+ throw ex
+ }
+
+ qt_select_merge_incremental """ SELECT * FROM ${dorisMergeTable} ORDER
BY id ASC """
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobNameMerge}'"""
+ def mergeJobCnt = sql """select count(1) from jobs("type"="insert")
where Name = '${jobNameMerge}'"""
+ assert mergeJobCnt.get(0).get(0) == 0
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]