This is an automated email from the ASF dual-hosted git repository. zykkk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 3c2232d [improvement] Oracle no primary key table synchronization (#165) 3c2232d is described below commit 3c2232dec2845be739e995641efb2b6e04668d4f Author: wudi <676366...@qq.com> AuthorDate: Fri Jul 21 14:15:17 2023 +0800 [improvement] Oracle no primary key table synchronization (#165) Co-authored-by: wudi <> --- .../doris/flink/catalog/doris/DorisSystem.java | 12 ++++++----- .../apache/doris/flink/tools/cdc/SourceSchema.java | 25 ++++++++++++++++++++-- .../flink/tools/cdc/oracle/OracleDatabaseSync.java | 10 +++------ 3 files changed, 33 insertions(+), 14 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java index 3e8612e..c0e9daa 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java @@ -169,11 +169,13 @@ public class DorisSystem { } sb = sb.deleteCharAt(sb.length() -1); sb.append(" ) "); - //append model - sb.append(schema.getModel().name()) - .append(" KEY(") - .append(String.join(",", identifier(schema.getKeys()))) - .append(")"); + //append uniq model + if(DataModel.UNIQUE.equals(schema.getModel())){ + sb.append(schema.getModel().name()) + .append(" KEY(") + .append(String.join(",", identifier(schema.getKeys()))) + .append(")"); + } //append table comment if(!StringUtils.isNullOrWhitespaceOnly(schema.getTableComment())){ diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java index 6fdbeac..c539a75 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java @@ -23,6 +23,7 @@ import org.apache.doris.flink.catalog.doris.TableSchema; import java.sql.DatabaseMetaData; import java.sql.ResultSet; import java.util.ArrayList; +import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -33,6 +34,7 @@ public abstract class SourceSchema { private final String tableComment; private final LinkedHashMap<String, FieldSchema> fields; public final List<String> primaryKeys; + public DataModel model = DataModel.UNIQUE; public SourceSchema( DatabaseMetaData metaData, String databaseName, String schemaName, String tableName, String tableComment) @@ -74,15 +76,26 @@ public abstract class SourceSchema { public TableSchema convertTableSchema(Map<String, String> tableProps) { TableSchema tableSchema = new TableSchema(); - tableSchema.setModel(DataModel.UNIQUE); + tableSchema.setModel(this.model); tableSchema.setFields(this.fields); tableSchema.setKeys(this.primaryKeys); tableSchema.setTableComment(this.tableComment); - tableSchema.setDistributeKeys(this.primaryKeys); + tableSchema.setDistributeKeys(buildDistributeKeys()); tableSchema.setProperties(tableProps); return tableSchema; } + private List<String> buildDistributeKeys(){ + if(!this.primaryKeys.isEmpty()){ + return primaryKeys; + } + if(!this.fields.isEmpty()){ + Map.Entry<String, FieldSchema> firstField = this.fields.entrySet().iterator().next(); + return Collections.singletonList(firstField.getKey()); + } + return new ArrayList<>(); + } + public String getDatabaseName() { return databaseName; } @@ -102,4 +115,12 @@ public abstract class SourceSchema { public String getTableComment() { return tableComment; } + + public DataModel getModel() { + return model; + } + + public void setModel(DataModel model) { + this.model = model; + } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java index f90f858..3b4bc31 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java @@ -25,6 +25,7 @@ import com.ververica.cdc.connectors.oracle.source.config.OracleSourceOptions; import com.ververica.cdc.debezium.DebeziumSourceFunction; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import com.ververica.cdc.debezium.table.DebeziumOptions; +import org.apache.doris.flink.catalog.doris.DataModel; import org.apache.doris.flink.tools.cdc.DatabaseSync; import org.apache.doris.flink.tools.cdc.SourceSchema; import org.apache.flink.api.common.eventtime.WatermarkStrategy; @@ -96,13 +97,8 @@ public class OracleDatabaseSync extends DatabaseSync { } SourceSchema sourceSchema = new OracleSchema(metaData, databaseName, schemaName, tableName, tableComment); - if (sourceSchema.primaryKeys.size() > 0) { - //Only sync tables with primary keys - schemaList.add(sourceSchema); - } else { - LOG.warn("table {} has no primary key, skip", tableName); - System.out.println("table " + tableName + " has no primary key, skip."); - } + sourceSchema.setModel(sourceSchema.primaryKeys.size() > 0 ? DataModel.UNIQUE : DataModel.DUPLICATE); + schemaList.add(sourceSchema); } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org