This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new efb1272  [Fix] fix deciaml parse (#70)
efb1272 is described below

commit efb127254906a8ee691cb0cf2a76a6ae97bfda5e
Author: wudi <676366...@qq.com>
AuthorDate: Thu Apr 24 11:09:36 2025 +0800

    [Fix] fix deciaml parse (#70)
---
 .../doris/kafka/connector/converter/RecordService.java      |  4 ++++
 .../doris/kafka/connector/converter/TestRecordService.java  | 13 ++++++++++++-
 2 files changed, 16 insertions(+), 1 deletion(-)

diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java 
b/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java
index 81afbd0..51b3aab 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java
@@ -51,7 +51,9 @@ import org.apache.doris.kafka.connector.writer.LoadConstants;
 import org.apache.doris.kafka.connector.writer.RecordBuffer;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.json.DecimalFormat;
 import org.apache.kafka.connect.json.JsonConverter;
+import org.apache.kafka.connect.json.JsonConverterConfig;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -72,6 +74,8 @@ public class RecordService {
         this.converter = new JsonConverter();
         Map<String, Object> converterConfig = new HashMap<>();
         converterConfig.put("schemas.enable", "false");
+        converterConfig.put(
+                JsonConverterConfig.DECIMAL_FORMAT_CONFIG, 
DecimalFormat.NUMERIC.name());
         this.converter.configure(converterConfig, false);
     }
 
diff --git 
a/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java
 
b/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java
index d9688da..da6d2af 100644
--- 
a/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java
+++ 
b/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java
@@ -77,7 +77,7 @@ public class TestRecordService {
         props.put("debezium.schema.evolution", "basic");
         props.put(
                 "doris.topic2table.map",
-                
"avro_schema.wdl_test.example_table:example_table,normal.wdl_test.test_sink_normal:test_sink_normal,mysql_test.doris_test.geo_table:geo_table,pg_test.doris_cdc.all_array_types:all_array_types");
+                
"avro_schema.wdl_test.example_table:example_table,normal.wdl_test.test_sink_normal:test_sink_normal,mysql_test.doris_test.geo_table:geo_table,pg_test.doris_cdc.all_array_types:all_array_types,normal_type:normal_type");
         recordService = new RecordService(new DorisOptions((Map) props));
         HashMap<String, String> config = new HashMap<>();
         jsonConverter.configure(config, false);
@@ -239,6 +239,17 @@ public class TestRecordService {
                 
"{\"name\":\"doris\",\"key\":\"1\"}\n{\"name\":\"doris\",\"key\":\"2\"}", s);
     }
 
+    @Test
+    public void processNormalAllTypes() throws IOException {
+        props.put("converter.mode", "normal");
+        recordService = new RecordService(new DorisOptions((Map) props));
+        String data =
+                
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"int64\",\"optional\":false,\"field\":\"id\"},{\"type\":\"int16\",\"optional\":true,\"field\":\"tiny_c\"},{\"type\":\"int16\",\"optional\":true,\"field\":\"tiny_un_c\"},{\"type\":\"int16\",\"optional\":true,\"field\":\"tiny_un_z_c\"},{\"type\":\"int16\",\"optional\":true,\"field\":\"small_c\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"small_un_c\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"small_un_z
 [...]
+        String target =
+                
"{\"id\":1,\"tiny_c\":127,\"tiny_un_c\":255,\"tiny_un_z_c\":255,\"small_c\":32767,\"small_un_c\":65535,\"small_un_z_c\":65535,\"medium_c\":8388607,\"medium_un_c\":16777215,\"medium_un_z_c\":16777215,\"int_c\":2147483647,\"int_un_c\":4294967295,\"int_un_z_c\":4294967295,\"int11_c\":2147483647,\"big_c\":9223372036854775807,\"big_un_c\":-1,\"big_un_z_c\":-1,\"varchar_c\":\"Hello
 
World\",\"char_c\":\"abc\",\"real_c\":123.102,\"float_c\":123.10199737548828,\"float_un_c\":123.1
 [...]
+        buildProcessStructRecord("normal_type", data, target);
+    }
+
     @Test
     public void processMySQlGeoStructRecord() throws IOException {
         String schemaStr =


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to