This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 25783492 [Fix](schema-chaneg) fix default empty to null (#456)
25783492 is described below

commit 2578349266f196520e67790d3639066e26f92649
Author: wudi <676366...@qq.com>
AuthorDate: Wed Aug 7 14:00:56 2024 +0800

    [Fix](schema-chaneg) fix default empty to null (#456)
---
 .../JsonDebeziumSchemaChangeImplV2.java            |  5 +--
 .../TestJsonDebeziumSchemaChangeImplV2.java        | 38 ++++++++++++++++++++++
 2 files changed, 41 insertions(+), 2 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
index 94605105..6600dd07 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
@@ -296,7 +296,8 @@ public class JsonDebeziumSchemaChangeImplV2 extends 
JsonDebeziumSchemaChange {
         }
     }
 
-    private void buildFieldSchema(Map<String, FieldSchema> filedSchemaMap, 
JsonNode column) {
+    @VisibleForTesting
+    public void buildFieldSchema(Map<String, FieldSchema> filedSchemaMap, 
JsonNode column) {
         String fieldName = column.get("name").asText();
         String dorisTypeName = buildDorisTypeName(column);
         String defaultValue = handleDefaultValue(extractJsonNode(column, 
"defaultValueExpression"));
@@ -315,7 +316,7 @@ public class JsonDebeziumSchemaChangeImplV2 extends 
JsonDebeziumSchemaChange {
     }
 
     private String handleDefaultValue(String defaultValue) {
-        if (StringUtils.isNullOrWhitespaceOnly(defaultValue)) {
+        if (defaultValue == null) {
             return null;
         }
         if (defaultValue.equals("1970-01-01 00:00:00")) {
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
index 3eec0fb6..39241f94 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
@@ -543,6 +543,44 @@ public class TestJsonDebeziumSchemaChangeImplV2 extends 
TestJsonDebeziumChangeBa
         return filedSchemaMap;
     }
 
+    @Test
+    public void buildFieldSchemaTest() {
+        Map<String, FieldSchema> result = new HashMap<>();
+        String columnInfo =
+                
"{\"name\":\"order_ts\",\"jdbcType\":2014,\"typeName\":\"TIMESTAMP\",\"typeExpression\":\"TIMESTAMP\",\"charsetName\":null,\"position\":4,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":\"中文注释\",\"hasDefaultValue\":true,\"defaultValueExpression\":\"1970-01-01
 00:00:00\",\"enumValues\":[]}\n";
+        schemaChange.setSourceConnector("mysql");
+        JsonNode columns = null;
+        try {
+            columns = objectMapper.readTree(columnInfo);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        schemaChange.buildFieldSchema(result, columns);
+        Assert.assertTrue(result.containsKey("order_ts"));
+        FieldSchema fieldSchema = result.get("order_ts");
+        Assert.assertEquals(fieldSchema.getName().toLowerCase(), "order_ts");
+        Assert.assertEquals(fieldSchema.getTypeString().toLowerCase(), 
"datetimev2(0)");
+        Assert.assertEquals(fieldSchema.getDefaultValue().toLowerCase(), 
"current_timestamp");
+        Assert.assertEquals(fieldSchema.getComment(), "中文注释");
+
+        columnInfo =
+                
"{\"name\":\"other_no\",\"jdbcType\":12,\"typeName\":\"VARCHAR\",\"typeExpression\":\"VARCHAR\",\"charsetName\":\"utf8mb4\",\"length\":50,\"position\":23,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":\"comment\",\"hasDefaultValue\":true,\"defaultValueExpression\":\"\",\"enumValues\":[]}\n";
+        schemaChange.setSourceConnector("mysql");
+        columns = null;
+        try {
+            columns = objectMapper.readTree(columnInfo);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        schemaChange.buildFieldSchema(result, columns);
+        Assert.assertTrue(result.containsKey("other_no"));
+        fieldSchema = result.get("other_no");
+        Assert.assertEquals(fieldSchema.getName().toLowerCase(), "other_no");
+        Assert.assertEquals(fieldSchema.getTypeString().toLowerCase(), 
"varchar(150)");
+        Assert.assertEquals(fieldSchema.getDefaultValue().toLowerCase(), "");
+        Assert.assertEquals(fieldSchema.getComment(), "comment");
+    }
+
     @After
     public void after() {
         mockRestService.close();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to