This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e8d00b  [improve]support string type record in one topic to multiple 
tables (#59)
6e8d00b is described below

commit 6e8d00b0dc47a3738b2c5d8d5d356e74c3906511
Author: wudongliang <46414265+donglian...@users.noreply.github.com>
AuthorDate: Tue Dec 31 19:16:21 2024 +0800

    [improve]support string type record in one topic to multiple tables (#59)
---
 .../connector/service/DorisDefaultSinkService.java | 37 +++++++++++++---
 .../connector/service/TestDorisSinkService.java    | 50 ++++++++++++++++++++++
 2 files changed, 80 insertions(+), 7 deletions(-)

diff --git 
a/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
 
b/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
index 7870b9d..36f78fe 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
@@ -20,8 +20,11 @@
 package org.apache.doris.kafka.connector.service;
 
 import com.codahale.metrics.MetricRegistry;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -39,6 +42,7 @@ import 
org.apache.doris.kafka.connector.writer.StreamLoadWriter;
 import org.apache.doris.kafka.connector.writer.load.LoadModel;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,9 +64,11 @@ public class DorisDefaultSinkService implements 
DorisSinkService {
     private final DorisOptions dorisOptions;
     private final MetricsJmxReporter metricsJmxReporter;
     private final DorisConnectMonitor connectMonitor;
+    private final ObjectMapper objectMapper;
 
     DorisDefaultSinkService(Map<String, String> config) {
         this.dorisOptions = new DorisOptions(config);
+        this.objectMapper = new ObjectMapper();
         this.writer = new HashMap<>();
         this.conn = new JdbcConnectionProvider(dorisOptions);
         MetricRegistry metricRegistry = new MetricRegistry();
@@ -202,19 +208,36 @@ public class DorisDefaultSinkService implements 
DorisSinkService {
         if (StringUtils.isEmpty(field)) {
             return defaultTableName;
         }
-        if (!(record.value() instanceof Map)) {
+        return parseRecordTableName(defaultTableName, field, record);
+    }
+
+    private String parseRecordTableName(
+            String defaultTableName, String tableNameField, SinkRecord record) 
{
+        Object recordValue = record.value();
+        Map<String, Object> recordMap = Collections.emptyMap();
+        if (recordValue instanceof Struct) {
             LOG.warn(
-                    "Only Map objects supported for The 
'record.tablename.field' configuration, field={}, record type={}",
-                    field,
-                    record.value().getClass().getName());
+                    "The Struct type record not supported for The 
'record.tablename.field' configuration, field={}",
+                    tableNameField);
             return defaultTableName;
+        } else if (recordValue instanceof Map) {
+            recordMap = (Map<String, Object>) recordValue;
+        } else if (recordValue instanceof String) {
+            try {
+                recordMap = objectMapper.readValue((String) recordValue, 
Map.class);
+            } catch (JsonProcessingException e) {
+                LOG.warn(
+                        "The String type record failed to parse record value 
to map. record={}, field={}",
+                        recordValue,
+                        tableNameField,
+                        e);
+            }
         }
-        Map<String, Object> map = (Map<String, Object>) record.value();
         // if the field is not found in the record, use the table name in the 
config
-        if (map.get(field) == null) {
+        if (!recordMap.containsKey(tableNameField)) {
             return defaultTableName;
         }
-        return map.get(field).toString();
+        return recordMap.get(tableNameField).toString();
     }
 
     private static String getNameIndex(String topic, int partition) {
diff --git 
a/src/test/java/org/apache/doris/kafka/connector/service/TestDorisSinkService.java
 
b/src/test/java/org/apache/doris/kafka/connector/service/TestDorisSinkService.java
index b947405..7ea1c2a 100644
--- 
a/src/test/java/org/apache/doris/kafka/connector/service/TestDorisSinkService.java
+++ 
b/src/test/java/org/apache/doris/kafka/connector/service/TestDorisSinkService.java
@@ -21,12 +21,16 @@ package org.apache.doris.kafka.connector.service;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig;
 import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.json.JsonConverter;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.junit.Assert;
 import org.junit.Before;
@@ -35,6 +39,7 @@ import org.junit.Test;
 public class TestDorisSinkService {
 
     private DorisDefaultSinkService dorisDefaultSinkService;
+    private JsonConverter jsonConverter = new JsonConverter();
 
     @Before
     public void init() throws IOException {
@@ -49,6 +54,7 @@ public class TestDorisSinkService {
         props.put("name", "sink-connector-test");
         props.put("record.tablename.field", "table_name");
         dorisDefaultSinkService = new DorisDefaultSinkService((Map) props);
+        jsonConverter.configure(new HashMap<>(), false);
     }
 
     @Test
@@ -81,5 +87,49 @@ public class TestDorisSinkService {
                         1);
         Assert.assertEquals(
                 "appoint_table", 
dorisDefaultSinkService.getSinkDorisTableName(record2));
+
+        String recordValue3 = "{\"id\":1,\"name\":\"bob\",\"age\":12}";
+        SinkRecord record3 =
+                new SinkRecord(
+                        "topic_test",
+                        0,
+                        Schema.OPTIONAL_STRING_SCHEMA,
+                        "key",
+                        Schema.OPTIONAL_STRING_SCHEMA,
+                        recordValue3,
+                        3);
+        Assert.assertEquals(
+                "test_kafka_tbl", 
dorisDefaultSinkService.getSinkDorisTableName(record3));
+
+        String recordValue4 =
+                
"{\"id\":12,\"name\":\"jack\",\"age\":13,\"table_name\":\"appoint_table2\"}";
+        SinkRecord record4 =
+                new SinkRecord(
+                        "topic_test",
+                        0,
+                        Schema.OPTIONAL_STRING_SCHEMA,
+                        "key",
+                        Schema.OPTIONAL_STRING_SCHEMA,
+                        recordValue4,
+                        3);
+        Assert.assertEquals(
+                "appoint_table2", 
dorisDefaultSinkService.getSinkDorisTableName(record4));
+
+        String structMsg =
+                
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"}],\"optional\":true,\"name\":\"normal.test.test_sink_normal.Value\",\"field\":\"before\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"}],\"optional\":true,\"name\":\"normal.test.t
 [...]
+        SchemaAndValue schemaAndValue =
+                jsonConverter.toConnectData(
+                        "topic_test", 
structMsg.getBytes(StandardCharsets.UTF_8));
+        SinkRecord record5 =
+                new SinkRecord(
+                        "topic_test",
+                        0,
+                        Schema.OPTIONAL_STRING_SCHEMA,
+                        "key",
+                        Schema.OPTIONAL_STRING_SCHEMA,
+                        new Struct(schemaAndValue.schema()),
+                        3);
+        Assert.assertEquals(
+                "test_kafka_tbl", 
dorisDefaultSinkService.getSinkDorisTableName(record5));
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to