This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 25783492 [Fix](schema-chaneg) fix default empty to null (#456) 25783492 is described below commit 2578349266f196520e67790d3639066e26f92649 Author: wudi <676366...@qq.com> AuthorDate: Wed Aug 7 14:00:56 2024 +0800 [Fix](schema-chaneg) fix default empty to null (#456) --- .../JsonDebeziumSchemaChangeImplV2.java | 5 +-- .../TestJsonDebeziumSchemaChangeImplV2.java | 38 ++++++++++++++++++++++ 2 files changed, 41 insertions(+), 2 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java index 94605105..6600dd07 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java @@ -296,7 +296,8 @@ public class JsonDebeziumSchemaChangeImplV2 extends JsonDebeziumSchemaChange { } } - private void buildFieldSchema(Map<String, FieldSchema> filedSchemaMap, JsonNode column) { + @VisibleForTesting + public void buildFieldSchema(Map<String, FieldSchema> filedSchemaMap, JsonNode column) { String fieldName = column.get("name").asText(); String dorisTypeName = buildDorisTypeName(column); String defaultValue = handleDefaultValue(extractJsonNode(column, "defaultValueExpression")); @@ -315,7 +316,7 @@ public class JsonDebeziumSchemaChangeImplV2 extends JsonDebeziumSchemaChange { } private String handleDefaultValue(String defaultValue) { - if (StringUtils.isNullOrWhitespaceOnly(defaultValue)) { + if (defaultValue == null) { return null; } if (defaultValue.equals("1970-01-01 00:00:00")) { diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java index 3eec0fb6..39241f94 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java @@ -543,6 +543,44 @@ public class TestJsonDebeziumSchemaChangeImplV2 extends TestJsonDebeziumChangeBa return filedSchemaMap; } + @Test + public void buildFieldSchemaTest() { + Map<String, FieldSchema> result = new HashMap<>(); + String columnInfo = + "{\"name\":\"order_ts\",\"jdbcType\":2014,\"typeName\":\"TIMESTAMP\",\"typeExpression\":\"TIMESTAMP\",\"charsetName\":null,\"position\":4,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":\"中文注释\",\"hasDefaultValue\":true,\"defaultValueExpression\":\"1970-01-01 00:00:00\",\"enumValues\":[]}\n"; + schemaChange.setSourceConnector("mysql"); + JsonNode columns = null; + try { + columns = objectMapper.readTree(columnInfo); + } catch (IOException e) { + e.printStackTrace(); + } + schemaChange.buildFieldSchema(result, columns); + Assert.assertTrue(result.containsKey("order_ts")); + FieldSchema fieldSchema = result.get("order_ts"); + Assert.assertEquals(fieldSchema.getName().toLowerCase(), "order_ts"); + Assert.assertEquals(fieldSchema.getTypeString().toLowerCase(), "datetimev2(0)"); + Assert.assertEquals(fieldSchema.getDefaultValue().toLowerCase(), "current_timestamp"); + Assert.assertEquals(fieldSchema.getComment(), "中文注释"); + + columnInfo = + "{\"name\":\"other_no\",\"jdbcType\":12,\"typeName\":\"VARCHAR\",\"typeExpression\":\"VARCHAR\",\"charsetName\":\"utf8mb4\",\"length\":50,\"position\":23,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":\"comment\",\"hasDefaultValue\":true,\"defaultValueExpression\":\"\",\"enumValues\":[]}\n"; + schemaChange.setSourceConnector("mysql"); + columns = null; + try { + columns = objectMapper.readTree(columnInfo); + } catch (IOException e) { + e.printStackTrace(); + } + schemaChange.buildFieldSchema(result, columns); + Assert.assertTrue(result.containsKey("other_no")); + fieldSchema = result.get("other_no"); + Assert.assertEquals(fieldSchema.getName().toLowerCase(), "other_no"); + Assert.assertEquals(fieldSchema.getTypeString().toLowerCase(), "varchar(150)"); + Assert.assertEquals(fieldSchema.getDefaultValue().toLowerCase(), ""); + Assert.assertEquals(fieldSchema.getComment(), "comment"); + } + @After public void after() { mockRestService.close(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org