This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push: new efb1272 [Fix] fix deciaml parse (#70) efb1272 is described below commit efb127254906a8ee691cb0cf2a76a6ae97bfda5e Author: wudi <676366...@qq.com> AuthorDate: Thu Apr 24 11:09:36 2025 +0800 [Fix] fix deciaml parse (#70) --- .../doris/kafka/connector/converter/RecordService.java | 4 ++++ .../doris/kafka/connector/converter/TestRecordService.java | 13 ++++++++++++- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java b/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java index 81afbd0..51b3aab 100644 --- a/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java +++ b/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java @@ -51,7 +51,9 @@ import org.apache.doris.kafka.connector.writer.LoadConstants; import org.apache.doris.kafka.connector.writer.RecordBuffer; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.json.DecimalFormat; import org.apache.kafka.connect.json.JsonConverter; +import org.apache.kafka.connect.json.JsonConverterConfig; import org.apache.kafka.connect.sink.SinkRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,6 +74,8 @@ public class RecordService { this.converter = new JsonConverter(); Map<String, Object> converterConfig = new HashMap<>(); converterConfig.put("schemas.enable", "false"); + converterConfig.put( + JsonConverterConfig.DECIMAL_FORMAT_CONFIG, DecimalFormat.NUMERIC.name()); this.converter.configure(converterConfig, false); } diff --git a/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java b/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java index d9688da..da6d2af 100644 --- a/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java +++ b/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java @@ -77,7 +77,7 @@ public class TestRecordService { props.put("debezium.schema.evolution", "basic"); props.put( "doris.topic2table.map", - "avro_schema.wdl_test.example_table:example_table,normal.wdl_test.test_sink_normal:test_sink_normal,mysql_test.doris_test.geo_table:geo_table,pg_test.doris_cdc.all_array_types:all_array_types"); + "avro_schema.wdl_test.example_table:example_table,normal.wdl_test.test_sink_normal:test_sink_normal,mysql_test.doris_test.geo_table:geo_table,pg_test.doris_cdc.all_array_types:all_array_types,normal_type:normal_type"); recordService = new RecordService(new DorisOptions((Map) props)); HashMap<String, String> config = new HashMap<>(); jsonConverter.configure(config, false); @@ -239,6 +239,17 @@ public class TestRecordService { "{\"name\":\"doris\",\"key\":\"1\"}\n{\"name\":\"doris\",\"key\":\"2\"}", s); } + @Test + public void processNormalAllTypes() throws IOException { + props.put("converter.mode", "normal"); + recordService = new RecordService(new DorisOptions((Map) props)); + String data = + "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"int64\",\"optional\":false,\"field\":\"id\"},{\"type\":\"int16\",\"optional\":true,\"field\":\"tiny_c\"},{\"type\":\"int16\",\"optional\":true,\"field\":\"tiny_un_c\"},{\"type\":\"int16\",\"optional\":true,\"field\":\"tiny_un_z_c\"},{\"type\":\"int16\",\"optional\":true,\"field\":\"small_c\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"small_un_c\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"small_un_z [...] + String target = + "{\"id\":1,\"tiny_c\":127,\"tiny_un_c\":255,\"tiny_un_z_c\":255,\"small_c\":32767,\"small_un_c\":65535,\"small_un_z_c\":65535,\"medium_c\":8388607,\"medium_un_c\":16777215,\"medium_un_z_c\":16777215,\"int_c\":2147483647,\"int_un_c\":4294967295,\"int_un_z_c\":4294967295,\"int11_c\":2147483647,\"big_c\":9223372036854775807,\"big_un_c\":-1,\"big_un_z_c\":-1,\"varchar_c\":\"Hello World\",\"char_c\":\"abc\",\"real_c\":123.102,\"float_c\":123.10199737548828,\"float_un_c\":123.1 [...] + buildProcessStructRecord("normal_type", data, target); + } + @Test public void processMySQlGeoStructRecord() throws IOException { String schemaStr = --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org