This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new b7b5802f [improve](cdc) add config for uniq index to primary key with 
create table (#524)
b7b5802f is described below

commit b7b5802fae187705a53e2793ca2fd0c211550d28
Author: wudi <676366...@qq.com>
AuthorDate: Fri Dec 6 10:41:39 2024 +0800

    [improve](cdc) add config for uniq index to primary key with create table 
(#524)
---
 .../java/org/apache/doris/flink/tools/cdc/DatabaseSync.java  |  5 +++++
 .../org/apache/doris/flink/tools/cdc/DorisTableConfig.java   | 12 ++++++++++++
 .../org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java   |  5 +----
 .../java/org/apache/doris/flink/tools/cdc/SourceSchema.java  |  6 +++++-
 .../container/e2e/mysql2doris/testMySQL2DorisCreateTable.txt |  3 ++-
 5 files changed, 25 insertions(+), 6 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index dea4422c..701bd967 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -463,6 +463,11 @@ public abstract class DatabaseSync {
     private void tryCreateTableIfAbsent(
             DorisSystem dorisSystem, String targetDb, String dorisTable, 
SourceSchema schema) {
         if (!dorisSystem.tableExists(targetDb, dorisTable)) {
+            if (dorisTableConfig.isConvertUniqToPk()
+                    && CollectionUtil.isNullOrEmpty(schema.primaryKeys)
+                    && !CollectionUtil.isNullOrEmpty(schema.uniqueIndexs)) {
+                schema.primaryKeys = new ArrayList<>(schema.uniqueIndexs);
+            }
             TableSchema dorisSchema =
                     DorisSchemaFactory.createTableSchema(
                             database,
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DorisTableConfig.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DorisTableConfig.java
index 6f5d929e..6014f249 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DorisTableConfig.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DorisTableConfig.java
@@ -32,6 +32,7 @@ public class DorisTableConfig implements Serializable {
     public static final String REPLICATION_NUM = "replication_num";
     public static final String TABLE_BUCKETS = "table-buckets";
     public static final String TABLE_PARTITIONS = "table-partitions";
+    public static final String CONVERT_UNIQ_TO_PK = "convert-uniq-to-pk";
 
     private final Map<String, String> tableProperties;
     // The specific parameters extracted from --table-conf need to be parsed 
and integrated into the
@@ -39,6 +40,8 @@ public class DorisTableConfig implements Serializable {
     private Map<String, Integer> tableBuckets;
     // table:partitionColumn:interval
     private Map<String, Tuple2<String, String>> tablePartitions;
+    // uniq index to primary key
+    private boolean convertUniqToPk = false;
 
     // Only for testing
     @VisibleForTesting
@@ -64,6 +67,11 @@ public class DorisTableConfig implements Serializable {
             tableConfig.remove(TABLE_PARTITIONS);
         }
 
+        if (tableConfig.containsKey(CONVERT_UNIQ_TO_PK)) {
+            this.convertUniqToPk = 
Boolean.parseBoolean(tableConfig.get(CONVERT_UNIQ_TO_PK));
+            tableConfig.remove(CONVERT_UNIQ_TO_PK);
+        }
+
         tableProperties = tableConfig;
     }
 
@@ -79,6 +87,10 @@ public class DorisTableConfig implements Serializable {
         return tablePartitions;
     }
 
+    public boolean isConvertUniqToPk() {
+        return convertUniqToPk;
+    }
+
     /**
      * Build table bucket Map.
      *
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java
index 2547b976..aa64037b 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java
@@ -49,10 +49,7 @@ public abstract class JdbcSourceSchema extends SourceSchema {
         super(databaseName, schemaName, tableName, tableComment);
         fields = getColumnInfo(metaData, databaseName, schemaName, tableName);
         primaryKeys = getPrimaryKeys(metaData, databaseName, schemaName, 
tableName);
-        if (primaryKeys.isEmpty()) {
-            List<String> uniqIndex = getUniqIndex(metaData, databaseName, 
schemaName, tableName);
-            primaryKeys.addAll(uniqIndex);
-        }
+        uniqueIndexs = getUniqIndex(metaData, databaseName, schemaName, 
tableName);
     }
 
     public LinkedHashMap<String, FieldSchema> getColumnInfo(
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
index de3e7975..aed1754c 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
@@ -37,6 +37,7 @@ public abstract class SourceSchema {
     protected final String tableComment;
     protected LinkedHashMap<String, FieldSchema> fields;
     public List<String> primaryKeys;
+    public List<String> uniqueIndexs;
     public DataModel model = DataModel.UNIQUE;
 
     public SourceSchema(
@@ -64,7 +65,6 @@ public abstract class SourceSchema {
         if (!StringUtils.isNullOrWhitespaceOnly(schemaName)) {
             identifier.add(schemaName);
         }
-
         if (!StringUtils.isNullOrWhitespaceOnly(tableName)) {
             identifier.add(tableName);
         }
@@ -115,6 +115,10 @@ public abstract class SourceSchema {
         return primaryKeys;
     }
 
+    public List<String> getUniqueIndexs() {
+        return uniqueIndexs;
+    }
+
     public String getTableComment() {
         return tableComment;
     }
diff --git 
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisCreateTable.txt
 
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisCreateTable.txt
index 632c3735..053dc9ef 100644
--- 
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisCreateTable.txt
+++ 
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisCreateTable.txt
@@ -2,4 +2,5 @@ mysql-sync-database
     --including-tables "create_tbl_.*"
     --create-table-only
     --table-conf 
table-buckets=create_tbl_uniq:10,create_tbl_from_uniqindex.*:30
-    --table-conf replication_num=1
\ No newline at end of file
+    --table-conf replication_num=1
+    --table-conf convert-uniq-to-pk=true
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to