This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new ef4feeb  [Bug] fix can not create table problem (#252)
ef4feeb is described below

commit ef4feeb17aabf007c6ba763d8a76f33a26baa7a3
Author: wudi <676366...@qq.com>
AuthorDate: Thu Nov 30 19:32:58 2023 +0800

    [Bug] fix can not create table problem (#252)
    
    Co-authored-by: wudi <>
---
 .../serializer/JsonDebeziumSchemaSerializer.java   | 52 ++++++++++++++++------
 .../apache/doris/flink/tools/cdc/DatabaseSync.java |  2 +
 .../flink/tools/cdc/mysql/MysqlDatabaseSync.java   |  4 +-
 3 files changed, 41 insertions(+), 17 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
index 29d9cee..25d06ef 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
@@ -93,6 +93,9 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
     private SchemaChangeManager schemaChangeManager;
     // <cdc db.schema.table, doris db.table>
     private Map<String, String> tableMapping;
+    // create table properties
+    private Map<String, String> tableProperties;
+    private String targetDatabase;
 
     public JsonDebeziumSchemaSerializer(DorisOptions dorisOptions,
             Pattern pattern,
@@ -134,24 +137,19 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
             String sourceTableName,
             boolean newSchemaChange,
             DorisExecutionOptions executionOptions,
-            Map<String, String> tableMapping) {
+            Map<String, String> tableMapping,
+            Map<String, String> tableProperties,
+            String targetDatabase) {
         this(dorisOptions, pattern, sourceTableName, newSchemaChange, 
executionOptions);
         this.tableMapping = tableMapping;
+        this.tableProperties = tableProperties;
+        this.targetDatabase = targetDatabase;
     }
 
     @Override
     public DorisRecord serialize(String record) throws IOException {
         LOG.debug("received debezium json data {} :", record);
         JsonNode recordRoot = objectMapper.readValue(record, JsonNode.class);
-
-        //Filter out table records that are not in tableMapping
-        String cdcTableIdentifier = getCdcTableIdentifier(recordRoot);
-        String dorisTableIdentifier = 
getDorisTableIdentifier(cdcTableIdentifier);
-        if(StringUtils.isNullOrWhitespaceOnly(dorisTableIdentifier)){
-            LOG.warn("filter table {}, because it is not listened, record 
detail is {}", cdcTableIdentifier, record);
-            return null;
-        }
-
         String op = extractJsonNode(recordRoot, "op");
         if (Objects.isNull(op)) {
             // schema change ddl
@@ -166,6 +164,15 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
         if (newSchemaChange && firstLoad) {
             initOriginFieldSchema(recordRoot);
         }
+
+        //Filter out table records that are not in tableMapping
+        String cdcTableIdentifier = getCdcTableIdentifier(recordRoot);
+        String dorisTableIdentifier = 
getDorisTableIdentifier(cdcTableIdentifier);
+        if(StringUtils.isNullOrWhitespaceOnly(dorisTableIdentifier)){
+            LOG.warn("filter table {}, because it is not listened, record 
detail is {}", cdcTableIdentifier, record);
+            return null;
+        }
+
         Map<String, Object> valueMap;
         switch (op) {
             case OP_READ:
@@ -313,11 +320,16 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
 
     @VisibleForTesting
     public TableSchema extractCreateTableSchema(JsonNode record) throws 
JsonProcessingException {
+        if(sourceConnector == null){
+            sourceConnector = 
SourceConnector.valueOf(record.get("source").get("connector").asText().toUpperCase());
+        }
+
         String dorisTable = getCreateTableIdentifier(record);
         JsonNode tableChange =  extractTableChange(record);
         JsonNode pkColumns = 
tableChange.get("table").get("primaryKeyColumnNames");
         JsonNode columns = tableChange.get("table").get("columns");
-        String tblComment = tableChange.get("table").get("comment").asText();
+        JsonNode comment = tableChange.get("table").get("comment");
+        String tblComment = comment == null ? "" : comment.asText();
         Map<String, FieldSchema> field = new LinkedHashMap<>();
         for (JsonNode column : columns) {
             buildFieldSchema(field, column);
@@ -333,6 +345,7 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
         tableSchema.setKeys(pkList);
         tableSchema.setDistributeKeys(buildDistributeKeys(pkList, field));
         tableSchema.setTableComment(tblComment);
+        tableSchema.setProperties(tableProperties);
 
         String[] split = dorisTable.split("\\.");
         Preconditions.checkArgument(split.length == 2);
@@ -402,9 +415,8 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
     }
 
     public String getCreateTableIdentifier(JsonNode record){
-        String db = extractJsonNode(record.get("source"), "db");
         String table = extractJsonNode(record.get("source"), "table");
-        return db + "." + table;
+        return targetDatabase + "." + table;
     }
 
     public String getDorisTableIdentifier(String cdcTableIdentifier){
@@ -657,6 +669,8 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
         private boolean newSchemaChange;
         private DorisExecutionOptions executionOptions;
         private Map<String, String> tableMapping;
+        private Map<String, String> tableProperties;
+        private String targetDatabase;
 
         public JsonDebeziumSchemaSerializer.Builder 
setDorisOptions(DorisOptions dorisOptions) {
             this.dorisOptions = dorisOptions;
@@ -688,9 +702,19 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
             return this;
         }
 
+        public Builder setTableProperties(Map<String, String> tableProperties) 
{
+            this.tableProperties = tableProperties;
+            return this;
+        }
+
+        public Builder setTargetDatabase(String targetDatabase) {
+            this.targetDatabase = targetDatabase;
+            return this;
+        }
+
         public JsonDebeziumSchemaSerializer build() {
             return new JsonDebeziumSchemaSerializer(dorisOptions, 
addDropDDLPattern, sourceTableName, newSchemaChange,
-                    executionOptions, tableMapping);
+                    executionOptions, tableMapping, tableProperties, 
targetDatabase);
         }
     }
 
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index 02ab034..fe5357e 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -242,6 +242,8 @@ public abstract class DatabaseSync {
                         .setNewSchemaChange(newSchemaChange)
                         .setExecutionOptions(executionOptions)
                         .setTableMapping(tableMapping)
+                        .setTableProperties(tableConfig)
+                        .setTargetDatabase(database)
                         .build())
                 .setDorisOptions(dorisBuilder.build());
         return builder.build();
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
index a3e01d3..a86b2ab 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
@@ -122,9 +122,7 @@ public class MysqlDatabaseSync extends DatabaseSync {
                 .username(config.get(MySqlSourceOptions.USERNAME))
                 .password(config.get(MySqlSourceOptions.PASSWORD))
                 .databaseList(databaseName)
-                .tableList(tableName)
-                //default open add newly table
-                .scanNewlyAddedTableEnabled(true);
+                .tableList(tableName);
 
         
config.getOptional(MySqlSourceOptions.SERVER_ID).ifPresent(sourceBuilder::serverId);
         config


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to