This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new e49f7e4 [fix][cdc] fix uid conflicts during multi-database synchronization. (#382) e49f7e4 is described below commit e49f7e49cebd561ba12b9a0d6e933248d6a43b92 Author: Petrichor <31833513+vinle...@users.noreply.github.com> AuthorDate: Mon May 13 14:07:15 2024 +0800 [fix][cdc] fix uid conflicts during multi-database synchronization. (#382) --- .../apache/doris/flink/tools/cdc/DatabaseSync.java | 26 ++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java index 632edcc..a1f511a 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java @@ -129,12 +129,14 @@ public abstract class DatabaseSync { tableBucketsMap = getTableBuckets(tableConfig.get("table-buckets")); } Set<String> bucketsTable = new HashSet<>(); + Set<String> targetDbSet = new HashSet<>(); for (SourceSchema schema : schemaList) { syncTables.add(schema.getTableName()); String targetDb = database; // Synchronize multiple databases using the src database name if (StringUtils.isNullOrWhitespaceOnly(targetDb)) { targetDb = schema.getDatabaseName(); + targetDbSet.add(targetDb); } if (StringUtils.isNullOrWhitespaceOnly(database) && !dorisSystem.databaseExists(targetDb)) { @@ -177,15 +179,35 @@ public abstract class DatabaseSync { int sinkParallel = sinkConfig.getInteger( DorisConfigOptions.SINK_PARALLELISM, sideOutput.getParallelism()); + String uidName = getUidName(targetDbSet, dbTbl); sideOutput .sinkTo(buildDorisSink(dbTbl.f0 + "." + dbTbl.f1)) .setParallelism(sinkParallel) - .name(dbTbl.f1) - .uid(dbTbl.f1); + .name(uidName) + .uid(uidName); } } } + /** + * @param targetDbSet The set of target databases. + * @param dbTbl The database-table tuple. + * @return The UID of the DataStream. + */ + public String getUidName(Set<String> targetDbSet, Tuple2<String, String> dbTbl) { + String uidName; + // Determine whether to proceed with multi-database synchronization. + // if yes, the UID is composed of `dbname_tablename`, otherwise it is composed of + // `tablename`. + if (targetDbSet.size() > 1) { + uidName = dbTbl.f0 + "_" + dbTbl.f1; + } else { + uidName = dbTbl.f1; + } + + return uidName; + } + private DorisConnectionOptions getDorisConnectionOptions() { String fenodes = sinkConfig.getString(DorisConfigOptions.FENODES); String benodes = sinkConfig.getString(DorisConfigOptions.BENODES); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org