This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new e49f7e4  [fix][cdc] fix uid conflicts during multi-database 
synchronization. (#382)
e49f7e4 is described below

commit e49f7e49cebd561ba12b9a0d6e933248d6a43b92
Author: Petrichor <31833513+vinle...@users.noreply.github.com>
AuthorDate: Mon May 13 14:07:15 2024 +0800

    [fix][cdc] fix uid conflicts during multi-database synchronization. (#382)
---
 .../apache/doris/flink/tools/cdc/DatabaseSync.java | 26 ++++++++++++++++++++--
 1 file changed, 24 insertions(+), 2 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index 632edcc..a1f511a 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -129,12 +129,14 @@ public abstract class DatabaseSync {
             tableBucketsMap = 
getTableBuckets(tableConfig.get("table-buckets"));
         }
         Set<String> bucketsTable = new HashSet<>();
+        Set<String> targetDbSet = new HashSet<>();
         for (SourceSchema schema : schemaList) {
             syncTables.add(schema.getTableName());
             String targetDb = database;
             // Synchronize multiple databases using the src database name
             if (StringUtils.isNullOrWhitespaceOnly(targetDb)) {
                 targetDb = schema.getDatabaseName();
+                targetDbSet.add(targetDb);
             }
             if (StringUtils.isNullOrWhitespaceOnly(database)
                     && !dorisSystem.databaseExists(targetDb)) {
@@ -177,15 +179,35 @@ public abstract class DatabaseSync {
                 int sinkParallel =
                         sinkConfig.getInteger(
                                 DorisConfigOptions.SINK_PARALLELISM, 
sideOutput.getParallelism());
+                String uidName = getUidName(targetDbSet, dbTbl);
                 sideOutput
                         .sinkTo(buildDorisSink(dbTbl.f0 + "." + dbTbl.f1))
                         .setParallelism(sinkParallel)
-                        .name(dbTbl.f1)
-                        .uid(dbTbl.f1);
+                        .name(uidName)
+                        .uid(uidName);
             }
         }
     }
 
+    /**
+     * @param targetDbSet The set of target databases.
+     * @param dbTbl The database-table tuple.
+     * @return The UID of the DataStream.
+     */
+    public String getUidName(Set<String> targetDbSet, Tuple2<String, String> 
dbTbl) {
+        String uidName;
+        // Determine whether to proceed with multi-database synchronization.
+        // if yes, the UID is composed of `dbname_tablename`, otherwise it is 
composed of
+        // `tablename`.
+        if (targetDbSet.size() > 1) {
+            uidName = dbTbl.f0 + "_" + dbTbl.f1;
+        } else {
+            uidName = dbTbl.f1;
+        }
+
+        return uidName;
+    }
+
     private DorisConnectionOptions getDorisConnectionOptions() {
         String fenodes = sinkConfig.getString(DorisConfigOptions.FENODES);
         String benodes = sinkConfig.getString(DorisConfigOptions.BENODES);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to