This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push: new 6e8d00b [improve]support string type record in one topic to multiple tables (#59) 6e8d00b is described below commit 6e8d00b0dc47a3738b2c5d8d5d356e74c3906511 Author: wudongliang <46414265+donglian...@users.noreply.github.com> AuthorDate: Tue Dec 31 19:16:21 2024 +0800 [improve]support string type record in one topic to multiple tables (#59) --- .../connector/service/DorisDefaultSinkService.java | 37 +++++++++++++--- .../connector/service/TestDorisSinkService.java | 50 ++++++++++++++++++++++ 2 files changed, 80 insertions(+), 7 deletions(-) diff --git a/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java b/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java index 7870b9d..36f78fe 100644 --- a/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java +++ b/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java @@ -20,8 +20,11 @@ package org.apache.doris.kafka.connector.service; import com.codahale.metrics.MetricRegistry; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -39,6 +42,7 @@ import org.apache.doris.kafka.connector.writer.StreamLoadWriter; import org.apache.doris.kafka.connector.writer.load.LoadModel; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.sink.SinkRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,9 +64,11 @@ public class DorisDefaultSinkService implements DorisSinkService { private final DorisOptions dorisOptions; private final MetricsJmxReporter metricsJmxReporter; private final DorisConnectMonitor connectMonitor; + private final ObjectMapper objectMapper; DorisDefaultSinkService(Map<String, String> config) { this.dorisOptions = new DorisOptions(config); + this.objectMapper = new ObjectMapper(); this.writer = new HashMap<>(); this.conn = new JdbcConnectionProvider(dorisOptions); MetricRegistry metricRegistry = new MetricRegistry(); @@ -202,19 +208,36 @@ public class DorisDefaultSinkService implements DorisSinkService { if (StringUtils.isEmpty(field)) { return defaultTableName; } - if (!(record.value() instanceof Map)) { + return parseRecordTableName(defaultTableName, field, record); + } + + private String parseRecordTableName( + String defaultTableName, String tableNameField, SinkRecord record) { + Object recordValue = record.value(); + Map<String, Object> recordMap = Collections.emptyMap(); + if (recordValue instanceof Struct) { LOG.warn( - "Only Map objects supported for The 'record.tablename.field' configuration, field={}, record type={}", - field, - record.value().getClass().getName()); + "The Struct type record not supported for The 'record.tablename.field' configuration, field={}", + tableNameField); return defaultTableName; + } else if (recordValue instanceof Map) { + recordMap = (Map<String, Object>) recordValue; + } else if (recordValue instanceof String) { + try { + recordMap = objectMapper.readValue((String) recordValue, Map.class); + } catch (JsonProcessingException e) { + LOG.warn( + "The String type record failed to parse record value to map. record={}, field={}", + recordValue, + tableNameField, + e); + } } - Map<String, Object> map = (Map<String, Object>) record.value(); // if the field is not found in the record, use the table name in the config - if (map.get(field) == null) { + if (!recordMap.containsKey(tableNameField)) { return defaultTableName; } - return map.get(field).toString(); + return recordMap.get(tableNameField).toString(); } private static String getNameIndex(String topic, int partition) { diff --git a/src/test/java/org/apache/doris/kafka/connector/service/TestDorisSinkService.java b/src/test/java/org/apache/doris/kafka/connector/service/TestDorisSinkService.java index b947405..7ea1c2a 100644 --- a/src/test/java/org/apache/doris/kafka/connector/service/TestDorisSinkService.java +++ b/src/test/java/org/apache/doris/kafka/connector/service/TestDorisSinkService.java @@ -21,12 +21,16 @@ package org.apache.doris.kafka.connector.service; import java.io.IOException; import java.io.InputStream; +import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; import java.util.Properties; import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig; import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.json.JsonConverter; import org.apache.kafka.connect.sink.SinkRecord; import org.junit.Assert; import org.junit.Before; @@ -35,6 +39,7 @@ import org.junit.Test; public class TestDorisSinkService { private DorisDefaultSinkService dorisDefaultSinkService; + private JsonConverter jsonConverter = new JsonConverter(); @Before public void init() throws IOException { @@ -49,6 +54,7 @@ public class TestDorisSinkService { props.put("name", "sink-connector-test"); props.put("record.tablename.field", "table_name"); dorisDefaultSinkService = new DorisDefaultSinkService((Map) props); + jsonConverter.configure(new HashMap<>(), false); } @Test @@ -81,5 +87,49 @@ public class TestDorisSinkService { 1); Assert.assertEquals( "appoint_table", dorisDefaultSinkService.getSinkDorisTableName(record2)); + + String recordValue3 = "{\"id\":1,\"name\":\"bob\",\"age\":12}"; + SinkRecord record3 = + new SinkRecord( + "topic_test", + 0, + Schema.OPTIONAL_STRING_SCHEMA, + "key", + Schema.OPTIONAL_STRING_SCHEMA, + recordValue3, + 3); + Assert.assertEquals( + "test_kafka_tbl", dorisDefaultSinkService.getSinkDorisTableName(record3)); + + String recordValue4 = + "{\"id\":12,\"name\":\"jack\",\"age\":13,\"table_name\":\"appoint_table2\"}"; + SinkRecord record4 = + new SinkRecord( + "topic_test", + 0, + Schema.OPTIONAL_STRING_SCHEMA, + "key", + Schema.OPTIONAL_STRING_SCHEMA, + recordValue4, + 3); + Assert.assertEquals( + "appoint_table2", dorisDefaultSinkService.getSinkDorisTableName(record4)); + + String structMsg = + "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"}],\"optional\":true,\"name\":\"normal.test.test_sink_normal.Value\",\"field\":\"before\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"}],\"optional\":true,\"name\":\"normal.test.t [...] + SchemaAndValue schemaAndValue = + jsonConverter.toConnectData( + "topic_test", structMsg.getBytes(StandardCharsets.UTF_8)); + SinkRecord record5 = + new SinkRecord( + "topic_test", + 0, + Schema.OPTIONAL_STRING_SCHEMA, + "key", + Schema.OPTIONAL_STRING_SCHEMA, + new Struct(schemaAndValue.schema()), + 3); + Assert.assertEquals( + "test_kafka_tbl", dorisDefaultSinkService.getSinkDorisTableName(record5)); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org