This is an automated email from the ASF dual-hosted git repository.

zykkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 857bae3  [improvement] update create table modify string key to 
varchar (#166)
857bae3 is described below

commit 857bae39e9850a63261a246ccbb99396c6c68358
Author: wudi <676366...@qq.com>
AuthorDate: Thu Aug 3 17:58:54 2023 +0800

    [improvement] update create table modify string key to varchar (#166)
---
 .../org/apache/doris/flink/catalog/doris/DorisSystem.java    | 12 ++++++++----
 .../java/org/apache/doris/flink/tools/cdc/SourceSchema.java  |  6 +++++-
 2 files changed, 13 insertions(+), 5 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
index c0e9daa..b635a96 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
@@ -155,7 +155,7 @@ public class DorisSystem {
                 throw new CreateTableException("key " + key + " not found in 
column list");
             }
             FieldSchema field = fields.get(key);
-            buildColumn(sb, field);
+            buildColumn(sb, field, true);
         }
 
         //append values
@@ -164,7 +164,7 @@ public class DorisSystem {
                 continue;
             }
             FieldSchema field = entry.getValue();
-            buildColumn(sb, field);
+            buildColumn(sb, field, false);
 
         }
         sb = sb.deleteCharAt(sb.length() -1);
@@ -210,10 +210,14 @@ public class DorisSystem {
         return sb.toString();
     }
 
-    private void buildColumn(StringBuilder sql, FieldSchema field){
+    private void buildColumn(StringBuilder sql, FieldSchema field, boolean 
isKey){
+        String fieldType = field.getTypeString();
+        if(isKey && DorisType.STRING.equals(fieldType)){
+            fieldType = String.format("%s(%s)", DorisType.VARCHAR, 65533);
+        }
         sql.append(identifier(field.getName()))
                 .append(" ")
-                .append(field.getTypeString())
+                .append(fieldType)
                 .append(" COMMENT '")
                 .append(quoteComment(field.getComment()))
                 .append("',");
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
index c539a75..9168cb5 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
@@ -78,13 +78,17 @@ public abstract class SourceSchema {
         TableSchema tableSchema = new TableSchema();
         tableSchema.setModel(this.model);
         tableSchema.setFields(this.fields);
-        tableSchema.setKeys(this.primaryKeys);
+        tableSchema.setKeys(buildKeys());
         tableSchema.setTableComment(this.tableComment);
         tableSchema.setDistributeKeys(buildDistributeKeys());
         tableSchema.setProperties(tableProps);
         return tableSchema;
     }
 
+    private List<String> buildKeys(){
+        return buildDistributeKeys();
+    }
+
     private List<String> buildDistributeKeys(){
         if(!this.primaryKeys.isEmpty()){
             return primaryKeys;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to