This is an automated email from the ASF dual-hosted git repository.

zykkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c2232d  [improvement] Oracle no primary key table synchronization 
(#165)
3c2232d is described below

commit 3c2232dec2845be739e995641efb2b6e04668d4f
Author: wudi <676366...@qq.com>
AuthorDate: Fri Jul 21 14:15:17 2023 +0800

    [improvement] Oracle no primary key table synchronization (#165)
    
    Co-authored-by: wudi <>
---
 .../doris/flink/catalog/doris/DorisSystem.java     | 12 ++++++-----
 .../apache/doris/flink/tools/cdc/SourceSchema.java | 25 ++++++++++++++++++++--
 .../flink/tools/cdc/oracle/OracleDatabaseSync.java | 10 +++------
 3 files changed, 33 insertions(+), 14 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
index 3e8612e..c0e9daa 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
@@ -169,11 +169,13 @@ public class DorisSystem {
         }
         sb = sb.deleteCharAt(sb.length() -1);
         sb.append(" ) ");
-        //append model
-        sb.append(schema.getModel().name())
-                .append(" KEY(")
-                .append(String.join(",", identifier(schema.getKeys())))
-                .append(")");
+        //append uniq model
+        if(DataModel.UNIQUE.equals(schema.getModel())){
+            sb.append(schema.getModel().name())
+                    .append(" KEY(")
+                    .append(String.join(",", identifier(schema.getKeys())))
+                    .append(")");
+        }
 
         //append table comment
         if(!StringUtils.isNullOrWhitespaceOnly(schema.getTableComment())){
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
index 6fdbeac..c539a75 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
@@ -23,6 +23,7 @@ import org.apache.doris.flink.catalog.doris.TableSchema;
 import java.sql.DatabaseMetaData;
 import java.sql.ResultSet;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -33,6 +34,7 @@ public abstract class SourceSchema {
     private final String tableComment;
     private final LinkedHashMap<String, FieldSchema> fields;
     public final List<String> primaryKeys;
+    public DataModel model = DataModel.UNIQUE;
 
     public SourceSchema(
             DatabaseMetaData metaData, String databaseName, String schemaName, 
String tableName, String tableComment)
@@ -74,15 +76,26 @@ public abstract class SourceSchema {
 
     public TableSchema convertTableSchema(Map<String, String> tableProps) {
         TableSchema tableSchema = new TableSchema();
-        tableSchema.setModel(DataModel.UNIQUE);
+        tableSchema.setModel(this.model);
         tableSchema.setFields(this.fields);
         tableSchema.setKeys(this.primaryKeys);
         tableSchema.setTableComment(this.tableComment);
-        tableSchema.setDistributeKeys(this.primaryKeys);
+        tableSchema.setDistributeKeys(buildDistributeKeys());
         tableSchema.setProperties(tableProps);
         return tableSchema;
     }
 
+    private List<String> buildDistributeKeys(){
+        if(!this.primaryKeys.isEmpty()){
+            return primaryKeys;
+        }
+        if(!this.fields.isEmpty()){
+            Map.Entry<String, FieldSchema> firstField = 
this.fields.entrySet().iterator().next();
+            return Collections.singletonList(firstField.getKey());
+        }
+        return new ArrayList<>();
+    }
+
     public String getDatabaseName() {
         return databaseName;
     }
@@ -102,4 +115,12 @@ public abstract class SourceSchema {
     public String getTableComment() {
         return tableComment;
     }
+
+    public DataModel getModel() {
+        return model;
+    }
+
+    public void setModel(DataModel model) {
+        this.model = model;
+    }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
index f90f858..3b4bc31 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
@@ -25,6 +25,7 @@ import 
com.ververica.cdc.connectors.oracle.source.config.OracleSourceOptions;
 import com.ververica.cdc.debezium.DebeziumSourceFunction;
 import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
 import com.ververica.cdc.debezium.table.DebeziumOptions;
+import org.apache.doris.flink.catalog.doris.DataModel;
 import org.apache.doris.flink.tools.cdc.DatabaseSync;
 import org.apache.doris.flink.tools.cdc.SourceSchema;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
@@ -96,13 +97,8 @@ public class OracleDatabaseSync extends DatabaseSync {
                     }
                     SourceSchema sourceSchema =
                             new OracleSchema(metaData, databaseName, 
schemaName, tableName, tableComment);
-                    if (sourceSchema.primaryKeys.size() > 0) {
-                        //Only sync tables with primary keys
-                        schemaList.add(sourceSchema);
-                    } else {
-                        LOG.warn("table {} has no primary key, skip", 
tableName);
-                        System.out.println("table " + tableName + " has no 
primary key, skip.");
-                    }
+                    sourceSchema.setModel(sourceSchema.primaryKeys.size() > 0 
? DataModel.UNIQUE : DataModel.DUPLICATE);
+                    schemaList.add(sourceSchema);
                 }
             }
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to