This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push: new 56b4668 [Improve] add behavior on null values (#69) 56b4668 is described below commit 56b4668696b3c33d044ecd7cc90e9b79dab18190 Author: wangchuang <59386838+chuang-wang-...@users.noreply.github.com> AuthorDate: Tue Apr 29 10:50:13 2025 +0800 [Improve] add behavior on null values (#69) --- .../doris/kafka/connector/cfg/DorisOptions.java | 9 +++ .../connector/cfg/DorisSinkConnectorConfig.java | 13 ++++- .../connector/model/BehaviorOnNullValues.java | 40 +++++++++++++ .../connector/service/DorisDefaultSinkService.java | 39 ++++++++++--- .../kafka/connector/utils/ConfigCheckUtils.java | 10 ++++ .../cfg/TestDorisSinkConnectorConfig.java | 7 +++ .../e2e/sink/stringconverter/StringMsgE2ETest.java | 55 ++++++++++++++++++ .../connector/service/TestDorisSinkService.java | 67 +++++++++++++++++++++- .../e2e/string_converter/null_values_default.json | 23 ++++++++ .../e2e/string_converter/null_values_fail.json | 24 ++++++++ .../e2e/string_converter/null_values_ignore.json | 24 ++++++++ .../e2e/string_converter/null_values_tab.sql | 12 ++++ 12 files changed, 314 insertions(+), 9 deletions(-) diff --git a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java index c1a2cbd..922eee6 100644 --- a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java +++ b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java @@ -28,6 +28,7 @@ import java.util.Properties; import java.util.stream.Collectors; import org.apache.doris.kafka.connector.converter.ConverterMode; import org.apache.doris.kafka.connector.converter.schema.SchemaEvolutionMode; +import org.apache.doris.kafka.connector.model.BehaviorOnNullValues; import org.apache.doris.kafka.connector.utils.ConfigCheckUtils; import org.apache.doris.kafka.connector.writer.DeliveryGuarantee; import org.apache.doris.kafka.connector.writer.load.LoadModel; @@ -68,6 +69,7 @@ public class DorisOptions { private final SchemaEvolutionMode schemaEvolutionMode; private final int maxRetries; private final int retryIntervalMs; + private final BehaviorOnNullValues behaviorOnNullValues; public DorisOptions(Map<String, String> config) { this.name = config.get(DorisSinkConnectorConfig.NAME); @@ -140,6 +142,9 @@ public class DorisOptions { DorisSinkConnectorConfig.RETRY_INTERVAL_MS, String.valueOf( DorisSinkConnectorConfig.RETRY_INTERVAL_MS_DEFAULT))); + this.behaviorOnNullValues = + BehaviorOnNullValues.of( + config.get(DorisSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES)); } private Properties getStreamLoadPropFromConfig(Map<String, String> config) { @@ -341,4 +346,8 @@ public class DorisOptions { public int getRetryIntervalMs() { return retryIntervalMs; } + + public BehaviorOnNullValues getBehaviorOnNullValues() { + return behaviorOnNullValues; + } } diff --git a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java index 53442ca..5b8807e 100644 --- a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java +++ b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java @@ -26,6 +26,7 @@ import java.util.Map; import org.apache.doris.kafka.connector.DorisSinkConnector; import org.apache.doris.kafka.connector.converter.ConverterMode; import org.apache.doris.kafka.connector.converter.schema.SchemaEvolutionMode; +import org.apache.doris.kafka.connector.model.BehaviorOnNullValues; import org.apache.doris.kafka.connector.utils.ConfigCheckUtils; import org.apache.doris.kafka.connector.writer.DeliveryGuarantee; import org.apache.doris.kafka.connector.writer.load.LoadModel; @@ -95,6 +96,9 @@ public class DorisSinkConnectorConfig { public static final String RETRY_INTERVAL_MS = "retry.interval.ms"; public static final int RETRY_INTERVAL_MS_DEFAULT = 6000; + public static final String BEHAVIOR_ON_NULL_VALUES = "behavior.on.null.values"; + public static final String BEHAVIOR_ON_NULL_VALUES_DEFAULT = BehaviorOnNullValues.IGNORE.name(); + // metrics public static final String JMX_OPT = "jmx"; public static final boolean JMX_OPT_DEFAULT = true; @@ -125,6 +129,7 @@ public class DorisSinkConnectorConfig { setFieldToDefaultValues(config, MAX_RETRIES, String.valueOf(MAX_RETRIES_DEFAULT)); setFieldToDefaultValues( config, RETRY_INTERVAL_MS, String.valueOf(RETRY_INTERVAL_MS_DEFAULT)); + setFieldToDefaultValues(config, BEHAVIOR_ON_NULL_VALUES, BEHAVIOR_ON_NULL_VALUES_DEFAULT); } public static Map<String, String> convertToLowercase(Map<String, String> config) { @@ -291,7 +296,13 @@ public class DorisSinkConnectorConfig { Type.INT, RETRY_INTERVAL_MS_DEFAULT, Importance.MEDIUM, - "The time in milliseconds to wait following an error before a retry attempt is made."); + "The time in milliseconds to wait following an error before a retry attempt is made.") + .define( + BEHAVIOR_ON_NULL_VALUES, + Type.STRING, + BEHAVIOR_ON_NULL_VALUES_DEFAULT, + Importance.LOW, + "Used to handle records with a null value ."); } public static class TopicToTableValidator implements ConfigDef.Validator { diff --git a/src/main/java/org/apache/doris/kafka/connector/model/BehaviorOnNullValues.java b/src/main/java/org/apache/doris/kafka/connector/model/BehaviorOnNullValues.java new file mode 100644 index 0000000..4398c6d --- /dev/null +++ b/src/main/java/org/apache/doris/kafka/connector/model/BehaviorOnNullValues.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.doris.kafka.connector.model; + +public enum BehaviorOnNullValues { + IGNORE("ignore"), + + FAIL("fail"); + + private String name; + + BehaviorOnNullValues(String name) { + this.name = name; + } + + public static BehaviorOnNullValues of(String name) { + return BehaviorOnNullValues.valueOf(name.toUpperCase()); + } + + public static String[] instances() { + return new String[] {IGNORE.name, FAIL.name}; + } +} diff --git a/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java b/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java index 3a220f1..e091264 100644 --- a/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java +++ b/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java @@ -32,10 +32,12 @@ import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.doris.kafka.connector.DorisSinkTask; import org.apache.doris.kafka.connector.cfg.DorisOptions; +import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig; import org.apache.doris.kafka.connector.connection.ConnectionProvider; import org.apache.doris.kafka.connector.connection.JdbcConnectionProvider; import org.apache.doris.kafka.connector.metrics.DorisConnectMonitor; import org.apache.doris.kafka.connector.metrics.MetricsJmxReporter; +import org.apache.doris.kafka.connector.model.BehaviorOnNullValues; import org.apache.doris.kafka.connector.writer.CopyIntoWriter; import org.apache.doris.kafka.connector.writer.DorisWriter; import org.apache.doris.kafka.connector.writer.StreamLoadWriter; @@ -44,6 +46,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTaskContext; import org.slf4j.Logger; @@ -126,13 +129,8 @@ public class DorisDefaultSinkService implements DorisSinkService { public void insert(final Collection<SinkRecord> records) { // note that records can be empty for (SinkRecord record : records) { - // skip null value records - if (record.value() == null) { - LOG.debug( - "Null valued record from topic '{}', partition {} and offset {} was skipped", - record.topic(), - record.kafkaPartition(), - record.kafkaOffset()); + // skip records + if (shouldSkipRecord(record)) { continue; } // check topic mutating SMTs @@ -212,6 +210,33 @@ public class DorisDefaultSinkService implements DorisSinkService { } } + @VisibleForTesting + public boolean shouldSkipRecord(SinkRecord record) { + if (record.value() == null) { + switch (dorisOptions.getBehaviorOnNullValues()) { + case FAIL: + throw new DataException( + String.format( + "Null valued record from topic %s, partition %s and offset %s was failed " + + "(the configuration property '%s' is '%s').", + record.topic(), + record.kafkaPartition(), + record.kafkaOffset(), + DorisSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES, + BehaviorOnNullValues.FAIL)); + case IGNORE: + default: + LOG.debug( + "Null valued record from topic '{}', partition {} and offset {} was skipped", + record.topic(), + record.kafkaPartition(), + record.kafkaOffset()); + return true; + } + } + return false; + } + /** * Get the table name in doris for the given record. * diff --git a/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java b/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java index d7a299f..465b7dc 100644 --- a/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java +++ b/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java @@ -32,6 +32,7 @@ import org.apache.doris.kafka.connector.converter.ConverterMode; import org.apache.doris.kafka.connector.converter.schema.SchemaEvolutionMode; import org.apache.doris.kafka.connector.exception.ArgumentsException; import org.apache.doris.kafka.connector.exception.DorisException; +import org.apache.doris.kafka.connector.model.BehaviorOnNullValues; import org.apache.doris.kafka.connector.writer.DeliveryGuarantee; import org.apache.doris.kafka.connector.writer.LoadConstants; import org.apache.doris.kafka.connector.writer.load.GroupCommitMode; @@ -200,6 +201,15 @@ public class ConfigCheckUtils { configIsValid = false; } + String behaviorOnNullValues = config.get(DorisSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES); + if (!validateEnumInstances(behaviorOnNullValues, BehaviorOnNullValues.instances())) { + LOG.error( + "The value of {} is an illegal parameter of {}.", + behaviorOnNullValues, + DorisSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES); + configIsValid = false; + } + if (!configIsValid) { throw new DorisException( "input kafka connector configuration is null, missing required values, or wrong input value"); diff --git a/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java b/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java index 5caa668..e6633ec 100644 --- a/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java +++ b/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java @@ -274,6 +274,13 @@ public class TestDorisSinkConnectorConfig { ConfigCheckUtils.validateConfig(config); } + @Test(expected = DorisException.class) + public void testBehaviorOnNullValuesException() { + Map<String, String> config = getConfig(); + config.put(DorisSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES, "create"); + ConfigCheckUtils.validateConfig(config); + } + @Test public void testSchemaEvolutionMode() { Map<String, String> config = getConfig(); diff --git a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java index 4430dad..9ec116f 100644 --- a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java +++ b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java @@ -331,6 +331,61 @@ public class StringMsgE2ETest extends AbstractStringE2ESinkTest { checkResult(expectedResult, query1, 3); } + @Test + public void testBehaviorOnNullValues() throws Exception { + // default + initialize("src/test/resources/e2e/string_converter/null_values_default.json"); + String tableSql = + loadContent("src/test/resources/e2e/string_converter/null_values_tab.sql"); + createTable(tableSql); + Thread.sleep(2000); + + String topic = "behavior_on_null_values_test"; + String msg1 = "{\"id\":1,\"col1\":\"col1\",\"col2\":\"col2\"}"; + produceMsg2Kafka(topic, null); + produceMsg2Kafka(topic, msg1); + + kafkaContainerService.registerKafkaConnector(connectorName, jsonMsgConnectorContent); + + List<String> expected = Collections.singletonList("1,col1,col2"); + Thread.sleep(10000); + String query1 = + String.format( + "select id,col1,col2 from %s.%s order by id", database, "null_values_tab"); + checkResult(expected, query1, 3); + + kafkaContainerService.deleteKafkaConnector(connectorName); + + // ignore + initialize("src/test/resources/e2e/string_converter/null_values_ignore.json"); + produceMsg2Kafka(topic, null); + String msg2 = "{\"id\":2,\"col1\":\"col1\",\"col2\":\"col2\"}"; + produceMsg2Kafka(topic, msg2); + + kafkaContainerService.registerKafkaConnector(connectorName, jsonMsgConnectorContent); + + Thread.sleep(10000); + expected = Arrays.asList("1,col1,col2", "2,col1,col2"); + checkResult(expected, query1, 3); + + kafkaContainerService.deleteKafkaConnector(connectorName); + + // fail + initialize("src/test/resources/e2e/string_converter/null_values_fail.json"); + produceMsg2Kafka(topic, null); + + kafkaContainerService.registerKafkaConnector(connectorName, jsonMsgConnectorContent); + + Thread.sleep(10000); + Assert.assertEquals("FAILED", kafkaContainerService.getConnectorTaskStatus(connectorName)); + + String msg3 = "{\"id\":3,\"col1\":\"col1\",\"col2\":\"col2\"}"; + produceMsg2Kafka(topic, msg3); + Thread.sleep(10000); + // msg3 is not consumed + checkResult(expected, query1, 3); + } + @AfterClass public static void closeInstance() { kafkaContainerService.deleteKafkaConnector(connectorName); diff --git a/src/test/java/org/apache/doris/kafka/connector/service/TestDorisSinkService.java b/src/test/java/org/apache/doris/kafka/connector/service/TestDorisSinkService.java index dc2e6ae..a99e144 100644 --- a/src/test/java/org/apache/doris/kafka/connector/service/TestDorisSinkService.java +++ b/src/test/java/org/apache/doris/kafka/connector/service/TestDorisSinkService.java @@ -36,6 +36,7 @@ import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.json.JsonConverter; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTaskContext; @@ -47,6 +48,7 @@ public class TestDorisSinkService { private DorisDefaultSinkService dorisDefaultSinkService; private JsonConverter jsonConverter = new JsonConverter(); + private Properties props = new Properties(); @Before public void init() throws IOException { @@ -54,7 +56,6 @@ public class TestDorisSinkService { this.getClass() .getClassLoader() .getResourceAsStream("doris-connector-sink.properties"); - Properties props = new Properties(); props.load(stream); DorisSinkConnectorConfig.setDefaultValues((Map) props); props.put("task_id", "1"); @@ -170,4 +171,68 @@ public class TestDorisSinkService { ConnectException.class, () -> dorisDefaultSinkService.checkTopicMutating(record2)); } + + @Test + public void shouldSkipRecordTest() { + // default + SinkRecord record1 = + new SinkRecord( + "topic_test", + 0, + Schema.OPTIONAL_STRING_SCHEMA, + "key", + Schema.OPTIONAL_STRING_SCHEMA, + "{\"id\":1,\"name\":\"bob\",\"age\":12}", + 1); + Assert.assertFalse(dorisDefaultSinkService.shouldSkipRecord(record1)); + SinkRecord record2 = + new SinkRecord( + "topic_test", + 0, + Schema.OPTIONAL_STRING_SCHEMA, + "key", + SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA) + .optional() + .build(), + null, + 1); + Assert.assertTrue(dorisDefaultSinkService.shouldSkipRecord(record2)); + + // ignore value + props.put("behavior.on.null.values", "ignore"); + dorisDefaultSinkService = + new DorisDefaultSinkService((Map) props, mock(SinkTaskContext.class)); + SinkRecord record3 = + new SinkRecord( + "topic_test", + 0, + Schema.OPTIONAL_STRING_SCHEMA, + "key", + SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA) + .optional() + .build(), + null, + 1); + Assert.assertTrue(dorisDefaultSinkService.shouldSkipRecord(record3)); + + // fail value + props.put("behavior.on.null.values", "fail"); + dorisDefaultSinkService = + new DorisDefaultSinkService((Map) props, mock(SinkTaskContext.class)); + SinkRecord record4 = + new SinkRecord( + "topic_test", + 0, + Schema.OPTIONAL_STRING_SCHEMA, + "key", + SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA) + .optional() + .build(), + null, + 1); + Assert.assertThrows( + "Null valued record from topic topic_test, partition 0 and offset 1 was failed (the configuration property 'behavior.on.null.values' is 'FAIL').", + DataException.class, + () -> dorisDefaultSinkService.shouldSkipRecord(record4)); + } } diff --git a/src/test/resources/e2e/string_converter/null_values_default.json b/src/test/resources/e2e/string_converter/null_values_default.json new file mode 100644 index 0000000..9ca212f --- /dev/null +++ b/src/test/resources/e2e/string_converter/null_values_default.json @@ -0,0 +1,23 @@ +{ + "name":"null_values_default_connector", + "config":{ + "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector", + "topics":"behavior_on_null_values_test", + "tasks.max":"1", + "doris.topic2table.map": "behavior_on_null_values_test:null_values_tab", + "buffer.count.records":"1", + "buffer.flush.time":"10", + "buffer.size.bytes":"10000000", + "doris.urls":"127.0.0.1", + "doris.user":"root", + "doris.password":"", + "doris.http.port":"8030", + "doris.query.port":"9030", + "doris.database":"string_msg", + "enable.2pc": "false", + "load.model":"stream_load", + "key.converter":"org.apache.kafka.connect.storage.StringConverter", + "value.converter":"org.apache.kafka.connect.storage.StringConverter", + "value.converter.schemas.enable": "false" + } +} \ No newline at end of file diff --git a/src/test/resources/e2e/string_converter/null_values_fail.json b/src/test/resources/e2e/string_converter/null_values_fail.json new file mode 100644 index 0000000..b12628f --- /dev/null +++ b/src/test/resources/e2e/string_converter/null_values_fail.json @@ -0,0 +1,24 @@ +{ + "name":"null_values_fail_connector", + "config":{ + "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector", + "topics":"behavior_on_null_values_test", + "tasks.max":"1", + "doris.topic2table.map": "behavior_on_null_values_test:null_values_tab", + "buffer.count.records":"1", + "buffer.flush.time":"10", + "buffer.size.bytes":"10000000", + "doris.urls":"127.0.0.1", + "doris.user":"root", + "doris.password":"", + "doris.http.port":"8030", + "doris.query.port":"9030", + "doris.database":"string_msg", + "enable.2pc": "false", + "load.model":"stream_load", + "behavior.on.null.values":"fail", + "key.converter":"org.apache.kafka.connect.storage.StringConverter", + "value.converter":"org.apache.kafka.connect.storage.StringConverter", + "value.converter.schemas.enable": "false" + } +} \ No newline at end of file diff --git a/src/test/resources/e2e/string_converter/null_values_ignore.json b/src/test/resources/e2e/string_converter/null_values_ignore.json new file mode 100644 index 0000000..05b8e89 --- /dev/null +++ b/src/test/resources/e2e/string_converter/null_values_ignore.json @@ -0,0 +1,24 @@ +{ + "name":"null_values_ignore_connector", + "config":{ + "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector", + "topics":"behavior_on_null_values_test", + "tasks.max":"1", + "doris.topic2table.map": "behavior_on_null_values_test:null_values_tab", + "buffer.count.records":"1", + "buffer.flush.time":"10", + "buffer.size.bytes":"10000000", + "doris.urls":"127.0.0.1", + "doris.user":"root", + "doris.password":"", + "doris.http.port":"8030", + "doris.query.port":"9030", + "doris.database":"string_msg", + "enable.2pc": "false", + "load.model":"stream_load", + "behavior.on.null.values":"ignore", + "key.converter":"org.apache.kafka.connect.storage.StringConverter", + "value.converter":"org.apache.kafka.connect.storage.StringConverter", + "value.converter.schemas.enable": "false" + } +} \ No newline at end of file diff --git a/src/test/resources/e2e/string_converter/null_values_tab.sql b/src/test/resources/e2e/string_converter/null_values_tab.sql new file mode 100644 index 0000000..22749d5 --- /dev/null +++ b/src/test/resources/e2e/string_converter/null_values_tab.sql @@ -0,0 +1,12 @@ +-- Please note that the database here should be consistent with doris.database in the file where the connector is registered. +CREATE TABLE string_msg.null_values_tab ( + id INT NULL, + col1 VARCHAR(20) NULL, + col2 varchar(20) NULL +) ENGINE=OLAP +UNIQUE KEY(`id`) +COMMENT 'OLAP' +DISTRIBUTED BY HASH(`id`) BUCKETS AUTO +PROPERTIES ( +"replication_allocation" = "tag.location.default: 1" +); \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org