This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new b7b5802f [improve](cdc) add config for uniq index to primary key with create table (#524) b7b5802f is described below commit b7b5802fae187705a53e2793ca2fd0c211550d28 Author: wudi <676366...@qq.com> AuthorDate: Fri Dec 6 10:41:39 2024 +0800 [improve](cdc) add config for uniq index to primary key with create table (#524) --- .../java/org/apache/doris/flink/tools/cdc/DatabaseSync.java | 5 +++++ .../org/apache/doris/flink/tools/cdc/DorisTableConfig.java | 12 ++++++++++++ .../org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java | 5 +---- .../java/org/apache/doris/flink/tools/cdc/SourceSchema.java | 6 +++++- .../container/e2e/mysql2doris/testMySQL2DorisCreateTable.txt | 3 ++- 5 files changed, 25 insertions(+), 6 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java index dea4422c..701bd967 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java @@ -463,6 +463,11 @@ public abstract class DatabaseSync { private void tryCreateTableIfAbsent( DorisSystem dorisSystem, String targetDb, String dorisTable, SourceSchema schema) { if (!dorisSystem.tableExists(targetDb, dorisTable)) { + if (dorisTableConfig.isConvertUniqToPk() + && CollectionUtil.isNullOrEmpty(schema.primaryKeys) + && !CollectionUtil.isNullOrEmpty(schema.uniqueIndexs)) { + schema.primaryKeys = new ArrayList<>(schema.uniqueIndexs); + } TableSchema dorisSchema = DorisSchemaFactory.createTableSchema( database, diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DorisTableConfig.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DorisTableConfig.java index 6f5d929e..6014f249 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DorisTableConfig.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DorisTableConfig.java @@ -32,6 +32,7 @@ public class DorisTableConfig implements Serializable { public static final String REPLICATION_NUM = "replication_num"; public static final String TABLE_BUCKETS = "table-buckets"; public static final String TABLE_PARTITIONS = "table-partitions"; + public static final String CONVERT_UNIQ_TO_PK = "convert-uniq-to-pk"; private final Map<String, String> tableProperties; // The specific parameters extracted from --table-conf need to be parsed and integrated into the @@ -39,6 +40,8 @@ public class DorisTableConfig implements Serializable { private Map<String, Integer> tableBuckets; // table:partitionColumn:interval private Map<String, Tuple2<String, String>> tablePartitions; + // uniq index to primary key + private boolean convertUniqToPk = false; // Only for testing @VisibleForTesting @@ -64,6 +67,11 @@ public class DorisTableConfig implements Serializable { tableConfig.remove(TABLE_PARTITIONS); } + if (tableConfig.containsKey(CONVERT_UNIQ_TO_PK)) { + this.convertUniqToPk = Boolean.parseBoolean(tableConfig.get(CONVERT_UNIQ_TO_PK)); + tableConfig.remove(CONVERT_UNIQ_TO_PK); + } + tableProperties = tableConfig; } @@ -79,6 +87,10 @@ public class DorisTableConfig implements Serializable { return tablePartitions; } + public boolean isConvertUniqToPk() { + return convertUniqToPk; + } + /** * Build table bucket Map. * diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java index 2547b976..aa64037b 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java @@ -49,10 +49,7 @@ public abstract class JdbcSourceSchema extends SourceSchema { super(databaseName, schemaName, tableName, tableComment); fields = getColumnInfo(metaData, databaseName, schemaName, tableName); primaryKeys = getPrimaryKeys(metaData, databaseName, schemaName, tableName); - if (primaryKeys.isEmpty()) { - List<String> uniqIndex = getUniqIndex(metaData, databaseName, schemaName, tableName); - primaryKeys.addAll(uniqIndex); - } + uniqueIndexs = getUniqIndex(metaData, databaseName, schemaName, tableName); } public LinkedHashMap<String, FieldSchema> getColumnInfo( diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java index de3e7975..aed1754c 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java @@ -37,6 +37,7 @@ public abstract class SourceSchema { protected final String tableComment; protected LinkedHashMap<String, FieldSchema> fields; public List<String> primaryKeys; + public List<String> uniqueIndexs; public DataModel model = DataModel.UNIQUE; public SourceSchema( @@ -64,7 +65,6 @@ public abstract class SourceSchema { if (!StringUtils.isNullOrWhitespaceOnly(schemaName)) { identifier.add(schemaName); } - if (!StringUtils.isNullOrWhitespaceOnly(tableName)) { identifier.add(tableName); } @@ -115,6 +115,10 @@ public abstract class SourceSchema { return primaryKeys; } + public List<String> getUniqueIndexs() { + return uniqueIndexs; + } + public String getTableComment() { return tableComment; } diff --git a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisCreateTable.txt b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisCreateTable.txt index 632c3735..053dc9ef 100644 --- a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisCreateTable.txt +++ b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisCreateTable.txt @@ -2,4 +2,5 @@ mysql-sync-database --including-tables "create_tbl_.*" --create-table-only --table-conf table-buckets=create_tbl_uniq:10,create_tbl_from_uniqindex.*:30 - --table-conf replication_num=1 \ No newline at end of file + --table-conf replication_num=1 + --table-conf convert-uniq-to-pk=true \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org